package com.sohu.jafka.server;

import com.sohu.jafka.consumer.Consumer;
import com.sohu.jafka.consumer.ConsumerConfig;
import com.sohu.jafka.consumer.ConsumerConnector;
import com.sohu.jafka.consumer.MessageStream;
import com.sohu.jafka.consumer.TopicEventHandler;
import com.sohu.jafka.consumer.ZookeeperTopicEventWatcher;
import com.sohu.jafka.message.Message;
import com.sohu.jafka.producer.Producer;
import com.sohu.jafka.producer.ProducerConfig;
import com.sohu.jafka.producer.serializer.MessageEncoders;
import com.sohu.jafka.utils.Closer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/server/EmbeddedConsumer.class */
public class EmbeddedConsumer implements TopicEventHandler<String> {
    private final ConsumerConfig consumerConfig;
    private final ServerStartable serverStartable;
    private final List<String> whiteListTopics;
    private final List<String> blackListTopics;
    private final Producer<Void, Message> producer;
    private ZookeeperTopicEventWatcher topicEventWatcher;
    private ConsumerConnector consumerConnector;
    private static final Logger logger = LoggerFactory.getLogger(EmbeddedConsumer.class);
    private List<MirroringThread> threadList = new ArrayList();
    private List<String> mirrorTopics = new ArrayList();

    public EmbeddedConsumer(ConsumerConfig consumerConfig, ProducerConfig producerConfig, ServerStartable serverStartable) {
        this.consumerConfig = consumerConfig;
        this.serverStartable = serverStartable;
        this.whiteListTopics = Arrays.asList(consumerConfig.getMirrorTopicsWhitelist().split(","));
        this.blackListTopics = Arrays.asList(consumerConfig.getMirrorTopicsWhitelist().split(","));
        this.producer = new Producer<>(producerConfig);
    }

    public void startup() {
        logger.info("staring up embedded consumer");
        this.topicEventWatcher = new ZookeeperTopicEventWatcher(this.consumerConfig, this, this.serverStartable);
    }

    public void shutdown() {
        if (this.topicEventWatcher != null) {
            Closer.closeQuietly(this.topicEventWatcher);
        }
        if (this.consumerConnector != null) {
            Closer.closeQuietly(this.consumerConnector);
        }
        Iterator<MirroringThread> it = this.threadList.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.producer.close();
    }

    @Override // com.sohu.jafka.consumer.TopicEventHandler
    public void handleTopicEvent(List<String> list) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (String str : list) {
            if (this.whiteListTopics.isEmpty() ? this.whiteListTopics.contains(str) : !this.blackListTopics.contains(str)) {
                arrayList.add(str);
                linkedHashMap.put(str, Integer.valueOf(this.consumerConfig.getMirrorConsumerNumThreads()));
                if (!this.mirrorTopics.contains(str)) {
                    arrayList2.add(str);
                }
            }
        }
        for (String str2 : this.mirrorTopics) {
            if (!arrayList.contains(str2)) {
                arrayList3.add(str2);
            }
        }
        this.mirrorTopics = arrayList;
        if (arrayList2.isEmpty() && arrayList3.isEmpty()) {
            return;
        }
        startNewConsumerThreads(linkedHashMap);
    }

    private void startNewConsumerThreads(Map<String, Integer> map) {
        if (map.isEmpty()) {
            return;
        }
        if (this.consumerConnector != null) {
            Closer.closeQuietly(this.consumerConnector);
        }
        Iterator<MirroringThread> it = this.threadList.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.threadList.clear();
        this.consumerConnector = Consumer.create(this.consumerConfig);
        for (Map.Entry entry : this.consumerConnector.createMessageStreams(map, new MessageEncoders()).entrySet()) {
            int i = 0;
            Iterator it2 = ((List) entry.getValue()).iterator();
            while (it2.hasNext()) {
                int i2 = i;
                i++;
                this.threadList.add(new MirroringThread((MessageStream) it2.next(), (String) entry.getKey(), i2, this.producer));
            }
        }
        Iterator<MirroringThread> it3 = this.threadList.iterator();
        while (it3.hasNext()) {
            it3.next().start();
        }
    }
}
