package com.sohu.jafka.consumer;

import com.sohu.jafka.common.ConsumerTimeoutException;
import com.sohu.jafka.message.MessageAndOffset;
import com.sohu.jafka.mx.ConsumerTopicStat;
import com.sohu.jafka.producer.serializer.Decoder;
import com.sohu.jafka.utils.IteratorTemplate;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/consumer/ConsumerIterator.class */
public class ConsumerIterator<T> extends IteratorTemplate<T> {
    final String topic;
    final BlockingQueue<FetchedDataChunk> queue;
    final int consumerTimeoutMs;
    final Decoder<T> decoder;
    private final Logger logger = LoggerFactory.getLogger(ConsumerIterator.class);
    private AtomicReference<Iterator<MessageAndOffset>> current = new AtomicReference<>(null);
    private PartitionTopicInfo currentTopicInfo = null;
    private long consumedOffset = -1;

    public ConsumerIterator(String str, BlockingQueue<FetchedDataChunk> blockingQueue, int i, Decoder<T> decoder) {
        this.topic = str;
        this.queue = blockingQueue;
        this.consumerTimeoutMs = i;
        this.decoder = decoder;
    }

    @Override // com.sohu.jafka.utils.IteratorTemplate, java.util.Iterator
    public T next() {
        T t = (T) super.next();
        if (this.consumedOffset < 0) {
            throw new IllegalStateException("Offset returned by the message set is invalid " + this.consumedOffset);
        }
        this.currentTopicInfo.resetConsumeOffset(this.consumedOffset);
        ConsumerTopicStat.getConsumerTopicStat(this.topic).recordMessagesPerTopic(1);
        return t;
    }

    @Override // com.sohu.jafka.utils.IteratorTemplate
    protected T makeNext() {
        try {
            return makeNext0();
        } catch (InterruptedException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    protected T makeNext0() throws InterruptedException {
        FetchedDataChunk poll;
        MessageAndOffset messageAndOffset;
        Iterator<MessageAndOffset> it = this.current.get();
        if (it == null || !it.hasNext()) {
            if (this.consumerTimeoutMs < 0) {
                poll = this.queue.take();
            } else {
                poll = this.queue.poll(this.consumerTimeoutMs, TimeUnit.MILLISECONDS);
                if (poll == null) {
                    resetState();
                    throw new ConsumerTimeoutException("consumer timeout in " + this.consumerTimeoutMs + " ms");
                }
            }
            if (poll == ZookeeperConsumerConnector.SHUTDOWN_COMMAND) {
                this.logger.warn("Now closing the message stream");
                this.queue.offer(poll);
                return allDone();
            }
            this.currentTopicInfo = poll.topicInfo;
            if (this.currentTopicInfo.getConsumedOffset() < poll.fetchOffset) {
                this.logger.error(String.format("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data", Long.valueOf(this.currentTopicInfo.getConsumedOffset()), Long.valueOf(poll.fetchOffset), this.currentTopicInfo));
                this.currentTopicInfo.resetConsumeOffset(poll.fetchOffset);
            }
            it = poll.messages.iterator();
            this.current.set(it);
        }
        MessageAndOffset next = it.next();
        while (true) {
            messageAndOffset = next;
            if (messageAndOffset.offset >= this.currentTopicInfo.getConsumedOffset() || !it.hasNext()) {
                break;
            }
            next = it.next();
        }
        this.consumedOffset = messageAndOffset.offset;
        return this.decoder.toEvent(messageAndOffset.message);
    }

    public void clearCurrentChunk() {
        if (this.current.get() != null) {
            this.logger.info("Clearing the current data chunk for this consumer iterator");
            this.current.set(null);
        }
    }
}
