package com.sohu.jafka.producer.async;

import com.sohu.jafka.common.IllegalQueueStateException;
import com.sohu.jafka.producer.SyncProducer;
import com.sohu.jafka.producer.serializer.Encoder;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/producer/async/ProducerSendThread.class */
public class ProducerSendThread<T> extends Thread {
    final String threadName;
    final BlockingQueue<QueueItem<T>> queue;
    final Encoder<T> serializer;
    final SyncProducer underlyingProducer;
    final EventHandler<T> eventHandler;
    final CallbackHandler<T> callbackHandler;
    final long queueTime;
    final int batchSize;
    private final Logger logger = LoggerFactory.getLogger(ProducerSendThread.class);
    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
    private volatile boolean shutdown = false;

    public ProducerSendThread(String str, BlockingQueue<QueueItem<T>> blockingQueue, Encoder<T> encoder, SyncProducer syncProducer, EventHandler<T> eventHandler, CallbackHandler<T> callbackHandler, long j, int i) {
        this.threadName = str;
        this.queue = blockingQueue;
        this.serializer = encoder;
        this.underlyingProducer = syncProducer;
        this.eventHandler = eventHandler;
        this.callbackHandler = callbackHandler;
        this.queueTime = j;
        this.batchSize = i;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        try {
            try {
                List<QueueItem<T>> processEvents = processEvents();
                if (processEvents.size() > 0) {
                    this.logger.debug(String.format("Dispatching last batch of %d events to the event handler", Integer.valueOf(processEvents.size())));
                    tryToHandle(processEvents);
                }
                this.shutdownLatch.countDown();
            } catch (Exception e) {
                this.logger.error("Error in sending events: ", e);
                this.shutdownLatch.countDown();
            }
        } catch (Throwable th) {
            this.shutdownLatch.countDown();
            throw th;
        }
    }

    public void awaitShutdown() {
        try {
            this.shutdownLatch.await();
        } catch (InterruptedException e) {
            this.logger.warn(e.getMessage());
        }
    }

    public void shutdown() {
        this.shutdown = true;
        this.eventHandler.close();
        this.logger.info("Shutdown thread complete");
    }

    private List<QueueItem<T>> processEvents() {
        List<QueueItem<T>> lastBatchBeforeClose;
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        while (!this.shutdown) {
            try {
                QueueItem<T> poll = this.queue.poll(Math.max(0L, (currentTimeMillis + this.queueTime) - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                boolean z2 = poll == null;
                if (poll != null) {
                    if (this.callbackHandler != null) {
                        List<QueueItem<T>> afterDequeuingExistingData = this.callbackHandler.afterDequeuingExistingData(poll);
                        if (afterDequeuingExistingData != null) {
                            arrayList.addAll(afterDequeuingExistingData);
                        }
                    } else {
                        arrayList.add(poll);
                    }
                    z = arrayList.size() >= this.batchSize;
                }
                if (z || z2) {
                    if (this.logger.isDebugEnabled()) {
                        if (z2) {
                            this.logger.debug(currentTimeMillis2 + " ms elapsed. Queue time reached. Sending..");
                        } else {
                            this.logger.debug(String.format("Batch(%d) full. Sending..", Integer.valueOf(this.batchSize)));
                        }
                    }
                    tryToHandle(arrayList);
                    currentTimeMillis = System.currentTimeMillis();
                    arrayList.clear();
                }
            } catch (InterruptedException e) {
                this.logger.warn(e.getMessage(), e);
            }
        }
        if (this.queue.size() > 0) {
            throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, " + this.queue.size() + " remaining items in the queue");
        }
        if (this.callbackHandler != null && (lastBatchBeforeClose = this.callbackHandler.lastBatchBeforeClose()) != null) {
            arrayList.addAll(lastBatchBeforeClose);
        }
        return arrayList;
    }

    private void tryToHandle(List<QueueItem<T>> list) {
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("handling " + list.size() + " events");
        }
        if (list.size() > 0) {
            try {
                this.eventHandler.handle(list, this.underlyingProducer, this.serializer);
            } catch (RuntimeException e) {
                this.logger.error("Error in handling batch of " + list.size() + " events", e);
            }
        }
    }
}
