package com.sohu.jafka.producer;

import com.github.zkclient.IZkChildListener;
import com.github.zkclient.IZkStateListener;
import com.github.zkclient.ZkClient;
import com.sohu.jafka.cluster.Broker;
import com.sohu.jafka.cluster.Partition;
import com.sohu.jafka.common.NoBrokersForPartitionException;
import com.sohu.jafka.producer.BrokerPartitionInfo;
import com.sohu.jafka.utils.ZKConfig;
import com.sohu.jafka.utils.zookeeper.ZkUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/producer/ZKBrokerPartitionInfo.class */
public class ZKBrokerPartitionInfo implements BrokerPartitionInfo {
    final ZKConfig zkConfig;
    final BrokerPartitionInfo.Callback callback;
    private final ZkClient zkClient;
    private final Logger logger = LoggerFactory.getLogger(ZKBrokerPartitionInfo.class);
    private final Object zkWatcherLock = new Object();
    private Map<Integer, Broker> allBrokers = getZKBrokerInfo();
    private Map<String, SortedSet<Partition>> topicBrokerPartitions = getZKTopicPartitionInfo(this.allBrokers);
    private BrokerTopicsListener brokerTopicsListener = new BrokerTopicsListener(this.topicBrokerPartitions, this.allBrokers);

    /* loaded from: input_file:com/sohu/jafka/producer/ZKBrokerPartitionInfo$BrokerTopicsListener.class */
    class BrokerTopicsListener implements IZkChildListener {
        private Map<String, SortedSet<Partition>> originalBrokerTopicsParitions;
        private Map<Integer, Broker> originBrokerIds;

        public BrokerTopicsListener(Map<String, SortedSet<Partition>> map, Map<Integer, Broker> map2) {
            this.originalBrokerTopicsParitions = new LinkedHashMap(map);
            this.originBrokerIds = new LinkedHashMap(map2);
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n/broker/topics, /broker/topics/<topic>, /broker/<ids>");
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to partition id per topic with " + map);
        }

        public void handleChildChange(String str, List<String> list) throws Exception {
            List<String> arrayList = list != null ? list : new ArrayList<>();
            synchronized (ZKBrokerPartitionInfo.this.zkWatcherLock) {
                if (ZkUtils.BrokerTopicsPath.equals(str)) {
                    Iterator<String> it = arrayList.iterator();
                    while (it.hasNext()) {
                        if (this.originalBrokerTopicsParitions.containsKey(it.next())) {
                            it.remove();
                        }
                    }
                    for (String str2 : arrayList) {
                        processNewBrokerInExistingTopic(str2, ZkUtils.getChildrenParentMayNotExist(ZKBrokerPartitionInfo.this.zkClient, "/brokers/topics/" + str2));
                        ZKBrokerPartitionInfo.this.zkClient.subscribeChildChanges("/brokers/topics/" + str2, ZKBrokerPartitionInfo.this.brokerTopicsListener);
                    }
                } else if (ZkUtils.BrokerIdsPath.equals(str)) {
                    processBrokerChange(str, arrayList);
                } else {
                    String[] split = str.split("/");
                    if (split.length == 4 && "topics".equals(split[2])) {
                        ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] List of brokers changed at " + str + "\t Currently registered  list of brokers -> " + arrayList + " for topic -> " + split[3]);
                        processNewBrokerInExistingTopic(split[3], arrayList);
                    }
                }
                resetState();
            }
        }

        private void processBrokerChange(String str, List<String> list) {
            HashMap hashMap = new HashMap(this.originBrokerIds);
            for (int size = list.size() - 1; size >= 0; size--) {
                if (hashMap.remove(Integer.valueOf(list.get(size))) != null) {
                    list.remove(size);
                }
            }
            for (String str2 : list) {
                String readData = ZkUtils.readData(ZKBrokerPartitionInfo.this.zkClient, "/brokers/ids/" + str2);
                Integer valueOf = Integer.valueOf(str2);
                Broker createBroker = Broker.createBroker(valueOf.intValue(), readData);
                ZKBrokerPartitionInfo.this.allBrokers.put(valueOf, createBroker);
                ZKBrokerPartitionInfo.this.callback.producerCbk(createBroker.id, createBroker.host, createBroker.port, createBroker.autocreated);
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                ZKBrokerPartitionInfo.this.allBrokers.remove(entry.getKey());
                Iterator it = ZKBrokerPartitionInfo.this.topicBrokerPartitions.entrySet().iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((SortedSet) ((Map.Entry) it.next()).getValue()).iterator();
                    while (it2.hasNext()) {
                        if (((Integer) entry.getKey()).intValue() == ((Partition) it2.next()).brokerId) {
                            it2.remove();
                        }
                    }
                }
            }
        }

        private void processNewBrokerInExistingTopic(String str, List<String> list) {
            SortedSet brokerPartitions = ZKBrokerPartitionInfo.getBrokerPartitions(ZKBrokerPartitionInfo.this.zkClient, str, list);
            SortedSet sortedSet = (SortedSet) ZKBrokerPartitionInfo.this.topicBrokerPartitions.get(str);
            TreeSet treeSet = new TreeSet();
            if (sortedSet != null) {
                treeSet.addAll(sortedSet);
            }
            treeSet.addAll(brokerPartitions);
            Iterator it = treeSet.iterator();
            while (it.hasNext()) {
                if (!ZKBrokerPartitionInfo.this.allBrokers.containsKey(Integer.valueOf(((Partition) it.next()).brokerId))) {
                    it.remove();
                }
            }
            ZKBrokerPartitionInfo.this.topicBrokerPartitions.put(str, treeSet);
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + str + " are " + treeSet);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetState() {
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] Before reseting broker topic partitions state " + this.originalBrokerTopicsParitions);
            this.originalBrokerTopicsParitions = new HashMap(ZKBrokerPartitionInfo.this.topicBrokerPartitions);
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] After reseting broker topic partitions state " + this.originalBrokerTopicsParitions);
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] Before reseting broker id map state " + this.originBrokerIds);
            this.originBrokerIds = new HashMap(ZKBrokerPartitionInfo.this.allBrokers);
            ZKBrokerPartitionInfo.this.logger.debug("[BrokerTopicsListener] After reseting broker id map state " + this.originBrokerIds);
        }
    }

    /* loaded from: input_file:com/sohu/jafka/producer/ZKBrokerPartitionInfo$ZKSessionExpirationListener.class */
    class ZKSessionExpirationListener implements IZkStateListener {
        ZKSessionExpirationListener() {
        }

        public void handleNewSession() throws Exception {
            ZKBrokerPartitionInfo.this.logger.info("ZK expired; release old list of broker partitions for topics ");
            ZKBrokerPartitionInfo.this.allBrokers = ZKBrokerPartitionInfo.this.getZKBrokerInfo();
            ZKBrokerPartitionInfo.this.topicBrokerPartitions = ZKBrokerPartitionInfo.this.getZKTopicPartitionInfo(ZKBrokerPartitionInfo.this.allBrokers);
            ZKBrokerPartitionInfo.this.brokerTopicsListener.resetState();
            Iterator it = ZKBrokerPartitionInfo.this.topicBrokerPartitions.keySet().iterator();
            while (it.hasNext()) {
                ZKBrokerPartitionInfo.this.zkClient.subscribeChildChanges("/brokers/topics/" + ((String) it.next()), ZKBrokerPartitionInfo.this.brokerTopicsListener);
            }
        }

        public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
        }
    }

    public ZKBrokerPartitionInfo(ZKConfig zKConfig, BrokerPartitionInfo.Callback callback) {
        this.zkConfig = zKConfig;
        this.callback = callback;
        this.zkClient = new ZkClient(zKConfig.getZkConnect(), zKConfig.getZkSessionTimeoutMs(), zKConfig.getZkConnectionTimeoutMs());
        this.zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, this.brokerTopicsListener);
        Iterator<String> it = this.topicBrokerPartitions.keySet().iterator();
        while (it.hasNext()) {
            this.zkClient.subscribeChildChanges("/brokers/topics/" + it.next(), this.brokerTopicsListener);
        }
        this.zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, this.brokerTopicsListener);
        this.zkClient.subscribeStateChanges(new ZKSessionExpirationListener());
    }

    @Override // com.sohu.jafka.producer.BrokerPartitionInfo
    public SortedSet<Partition> getBrokerPartitionInfo(String str) {
        synchronized (this.zkWatcherLock) {
            SortedSet<Partition> sortedSet = this.topicBrokerPartitions.get(str);
            if (sortedSet != null && sortedSet.size() != 0) {
                return new TreeSet((SortedSet) sortedSet);
            }
            SortedSet<Partition> bootstrapWithExistingBrokers = bootstrapWithExistingBrokers(str);
            this.topicBrokerPartitions.put(str, bootstrapWithExistingBrokers);
            return bootstrapWithExistingBrokers;
        }
    }

    private SortedSet<Partition> bootstrapWithExistingBrokers(String str) {
        List<String> childrenParentMayNotExist = ZkUtils.getChildrenParentMayNotExist(this.zkClient, ZkUtils.BrokerIdsPath);
        if (childrenParentMayNotExist == null) {
            throw new NoBrokersForPartitionException("no brokers");
        }
        TreeSet treeSet = new TreeSet();
        Iterator<String> it = childrenParentMayNotExist.iterator();
        while (it.hasNext()) {
            treeSet.add(new Partition(Integer.valueOf(it.next()).intValue(), 0));
        }
        return treeSet;
    }

    @Override // com.sohu.jafka.producer.BrokerPartitionInfo
    public Broker getBrokerInfo(int i) {
        Broker broker;
        synchronized (this.zkWatcherLock) {
            broker = this.allBrokers.get(Integer.valueOf(i));
        }
        return broker;
    }

    @Override // com.sohu.jafka.producer.BrokerPartitionInfo
    public Map<Integer, Broker> getAllBrokerInfo() {
        return this.allBrokers;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<String, SortedSet<Partition>> getZKTopicPartitionInfo(Map<Integer, Broker> map) {
        HashMap hashMap = new HashMap();
        ZkUtils.makeSurePersistentPathExists(this.zkClient, ZkUtils.BrokerTopicsPath);
        for (String str : ZkUtils.getChildrenParentMayNotExist(this.zkClient, ZkUtils.BrokerTopicsPath)) {
            String str2 = "/brokers/topics/" + str;
            List<String> childrenParentMayNotExist = ZkUtils.getChildrenParentMayNotExist(this.zkClient, str2);
            TreeSet treeSet = new TreeSet();
            HashSet hashSet = new HashSet();
            for (String str3 : childrenParentMayNotExist) {
                int parseInt = Integer.parseInt(str3);
                Integer valueOf = Integer.valueOf(ZkUtils.readData(this.zkClient, str2 + "/" + str3));
                for (int i = 0; i < valueOf.intValue(); i++) {
                    treeSet.add(new Partition(parseInt, i));
                }
                hashSet.add(Integer.valueOf(parseInt));
            }
            for (Integer num : map.keySet()) {
                if (!hashSet.contains(num)) {
                    treeSet.add(new Partition(num.intValue(), 0));
                }
            }
            this.logger.debug("Broker ids and # of partitions on each for topic: " + str + " = " + treeSet);
            hashMap.put(str, treeSet);
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Map<Integer, Broker> getZKBrokerInfo() {
        HashMap hashMap = new HashMap();
        List<String> childrenParentMayNotExist = ZkUtils.getChildrenParentMayNotExist(this.zkClient, ZkUtils.BrokerIdsPath);
        if (childrenParentMayNotExist != null) {
            this.logger.info("read all brokers count: " + childrenParentMayNotExist.size());
            for (String str : childrenParentMayNotExist) {
                Broker createBroker = Broker.createBroker(Integer.valueOf(str).intValue(), ZkUtils.readData(this.zkClient, "/brokers/ids/" + str));
                hashMap.put(Integer.valueOf(str), createBroker);
                this.logger.info("Loading Broker " + createBroker);
            }
        }
        return hashMap;
    }

    @Override // com.sohu.jafka.producer.BrokerPartitionInfo
    public void updateInfo() {
        synchronized (this.zkWatcherLock) {
            this.allBrokers = getZKBrokerInfo();
            this.topicBrokerPartitions = getZKTopicPartitionInfo(this.allBrokers);
        }
    }

    @Override // com.sohu.jafka.producer.BrokerPartitionInfo, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.zkClient.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static SortedSet<Partition> getBrokerPartitions(ZkClient zkClient, String str, List<?> list) {
        String str2 = "/brokers/topics/" + str;
        TreeSet treeSet = new TreeSet();
        Iterator<?> it = list.iterator();
        while (it.hasNext()) {
            Integer valueOf = Integer.valueOf(it.next().toString());
            Integer valueOf2 = Integer.valueOf(ZkUtils.readData(zkClient, str2 + "/" + valueOf));
            for (int i = 0; i < valueOf2.intValue(); i++) {
                treeSet.add(new Partition(valueOf.intValue(), i));
            }
        }
        return treeSet;
    }
}
