package com.sohu.jafka.producer.async;

import com.sohu.jafka.api.ProducerRequest;
import com.sohu.jafka.message.ByteBufferMessageSet;
import com.sohu.jafka.message.CompressionCodec;
import com.sohu.jafka.message.Message;
import com.sohu.jafka.producer.ProducerConfig;
import com.sohu.jafka.producer.SyncProducer;
import com.sohu.jafka.producer.serializer.Encoder;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/producer/async/DefaultEventHandler.class */
public class DefaultEventHandler<T> implements EventHandler<T> {
    private final CallbackHandler<T> callbackHandler;
    private final Set<String> compressedTopics;
    private final CompressionCodec codec;
    private final Logger logger = LoggerFactory.getLogger(DefaultEventHandler.class);
    private final int numRetries;

    public DefaultEventHandler(ProducerConfig producerConfig, CallbackHandler<T> callbackHandler) {
        this.callbackHandler = callbackHandler;
        this.compressedTopics = new HashSet(producerConfig.getCompressedTopics());
        this.codec = producerConfig.getCompressionCodec();
        this.numRetries = producerConfig.getNumRetries();
    }

    @Override // com.sohu.jafka.producer.async.EventHandler
    public void init(Properties properties) {
    }

    @Override // com.sohu.jafka.producer.async.EventHandler
    public void handle(List<QueueItem<T>> list, SyncProducer syncProducer, Encoder<T> encoder) {
        List<QueueItem<T>> list2 = list;
        if (this.callbackHandler != null) {
            list2 = this.callbackHandler.beforeSendingData(list);
        }
        send(collate(list2, encoder), syncProducer);
    }

    private void send(List<ProducerRequest> list, SyncProducer syncProducer) {
        if (list.isEmpty()) {
            return;
        }
        int i = 1 + this.numRetries;
        for (int i2 = 0; i2 < i; i2++) {
            try {
                syncProducer.multiSend(list);
                return;
            } catch (RuntimeException e) {
                this.logger.warn("error sending message, attempts times: " + i2, e);
                if (i2 == i - 1) {
                    throw e;
                }
            }
        }
    }

    private List<ProducerRequest> collate(List<QueueItem<T>> list, Encoder<T> encoder) {
        if (list == null || list.isEmpty()) {
            return Collections.emptyList();
        }
        HashMap hashMap = new HashMap();
        for (QueueItem<T> queueItem : list) {
            Map map = (Map) hashMap.get(queueItem.topic);
            if (map == null) {
                map = new HashMap();
                hashMap.put(queueItem.topic, map);
            }
            List list2 = (List) map.get(Integer.valueOf(queueItem.partition));
            if (list2 == null) {
                list2 = new ArrayList();
                map.put(Integer.valueOf(queueItem.partition), list2);
            }
            list2.add(encoder.toMessage(queueItem.data));
        }
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : hashMap.entrySet()) {
            String str = (String) entry.getKey();
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                arrayList.add(new ProducerRequest(str, ((Integer) entry2.getKey()).intValue(), convert(str, (List) entry2.getValue())));
            }
        }
        return arrayList;
    }

    private ByteBufferMessageSet convert(String str, List<Message> list) {
        return (this.codec == CompressionCodec.NoCompressionCodec || !(this.compressedTopics.isEmpty() || this.compressedTopics.contains(str))) ? new ByteBufferMessageSet(CompressionCodec.NoCompressionCodec, (Message[]) list.toArray(new Message[list.size()])) : new ByteBufferMessageSet(this.codec, (Message[]) list.toArray(new Message[list.size()]));
    }

    @Override // com.sohu.jafka.producer.async.EventHandler, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
    }
}
