package com.sohu.jafka.consumer;

import com.github.zkclient.IZkChildListener;
import com.github.zkclient.IZkStateListener;
import com.github.zkclient.ZkClient;
import com.github.zkclient.exception.ZkNodeExistsException;
import com.sohu.jafka.api.OffsetRequest;
import com.sohu.jafka.cluster.Broker;
import com.sohu.jafka.cluster.Cluster;
import com.sohu.jafka.cluster.Partition;
import com.sohu.jafka.common.ConsumerRebalanceFailedException;
import com.sohu.jafka.common.InvalidConfigException;
import com.sohu.jafka.producer.serializer.Decoder;
import com.sohu.jafka.utils.Closer;
import com.sohu.jafka.utils.KV;
import com.sohu.jafka.utils.Pool;
import com.sohu.jafka.utils.Scheduler;
import com.sohu.jafka.utils.zookeeper.ZkGroupDirs;
import com.sohu.jafka.utils.zookeeper.ZkGroupTopicDirs;
import com.sohu.jafka.utils.zookeeper.ZkUtils;
import java.io.Closeable;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sohu/jafka/consumer/ZookeeperConsumerConnector.class */
public class ZookeeperConsumerConnector implements ConsumerConnector {
    public static final FetchedDataChunk SHUTDOWN_COMMAND = new FetchedDataChunk(null, null, -1);
    private final Logger logger;
    private final AtomicBoolean isShuttingDown;
    private final Object rebalanceLock;
    private Fetcher fetcher;
    private ZkClient zkClient;
    private Pool<String, Pool<Partition, PartitionTopicInfo>> topicRegistry;
    private final Pool<KV.StringTuple, BlockingQueue<FetchedDataChunk>> queues;
    private final Scheduler scheduler;
    final ConsumerConfig config;
    final boolean enableFetcher;
    private List<ZKRebalancerListener<?>> rebalancerListeners;

    /* loaded from: input_file:com/sohu/jafka/consumer/ZookeeperConsumerConnector$AutoCommitTask.class */
    class AutoCommitTask implements Runnable {
        AutoCommitTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ZookeeperConsumerConnector.this.commitOffsets();
            } catch (Throwable th) {
                ZookeeperConsumerConnector.this.logger.error("exception during autoCommit: ", th);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/sohu/jafka/consumer/ZookeeperConsumerConnector$ZKRebalancerListener.class */
    public class ZKRebalancerListener<T> implements IZkChildListener, Runnable, Closeable {
        final String group;
        final String consumerIdString;
        Map<String, List<MessageStream<T>>> messagesStreams;
        private final Thread watcherExecutorThread;
        static final /* synthetic */ boolean $assertionsDisabled;
        private boolean isWatcherTriggered = false;
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition cond = this.lock.newCondition();
        private CountDownLatch shutDownLatch = new CountDownLatch(1);

        public ZKRebalancerListener(String str, String str2, Map<String, List<MessageStream<T>>> map) {
            this.group = str;
            this.consumerIdString = str2;
            this.messagesStreams = map;
            this.watcherExecutorThread = new Thread(this, str2 + "_watcher_executor");
        }

        public void start() {
            this.watcherExecutorThread.start();
        }

        public void handleChildChange(String str, List<String> list) throws Exception {
            this.lock.lock();
            try {
                this.isWatcherTriggered = true;
                this.cond.signalAll();
            } finally {
                this.lock.unlock();
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            this.lock.lock();
            try {
                this.isWatcherTriggered = false;
                this.cond.signalAll();
                try {
                    this.shutDownLatch.await(5L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                }
            } finally {
                this.lock.unlock();
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            ZookeeperConsumerConnector.this.logger.info("starting watcher executor thread for consumer " + this.consumerIdString);
            while (!ZookeeperConsumerConnector.this.isShuttingDown.get()) {
                try {
                    this.lock.lock();
                    try {
                        if (!this.isWatcherTriggered) {
                            this.cond.await(1000L, TimeUnit.MILLISECONDS);
                        }
                        boolean z = this.isWatcherTriggered;
                        this.isWatcherTriggered = false;
                        this.lock.unlock();
                        if (z) {
                            syncedRebalance();
                        }
                    } catch (Throwable th) {
                        boolean z2 = this.isWatcherTriggered;
                        this.isWatcherTriggered = false;
                        this.lock.unlock();
                        throw th;
                        break;
                    }
                } catch (Throwable th2) {
                    ZookeeperConsumerConnector.this.logger.error("error during syncedRebalance", th2);
                }
            }
            ZookeeperConsumerConnector.this.logger.info("stopped thread " + this.watcherExecutorThread.getName());
            this.shutDownLatch.countDown();
        }

        public void syncedRebalance() {
            synchronized (ZookeeperConsumerConnector.this.rebalanceLock) {
                for (int i = 0; i < ZookeeperConsumerConnector.this.config.getMaxRebalanceRetries(); i++) {
                    if (ZookeeperConsumerConnector.this.isShuttingDown.get()) {
                        return;
                    }
                    ZookeeperConsumerConnector.this.logger.info(String.format("[%s] rebalancing starting. try #%d", this.consumerIdString, Integer.valueOf(i)));
                    long currentTimeMillis = System.currentTimeMillis();
                    boolean z = false;
                    Cluster cluster = ZkUtils.getCluster(ZookeeperConsumerConnector.this.zkClient);
                    try {
                        z = rebalance(cluster);
                    } catch (Exception e) {
                        ZookeeperConsumerConnector.this.logger.info("exception during rebalance ", e);
                    }
                    Logger logger = ZookeeperConsumerConnector.this.logger;
                    Object[] objArr = new Object[4];
                    objArr[0] = this.consumerIdString;
                    objArr[1] = z ? "OK" : "FAILED";
                    objArr[2] = Integer.valueOf(i);
                    objArr[3] = Long.valueOf(System.currentTimeMillis() - currentTimeMillis);
                    logger.info(String.format("[%s] rebalanced %s. try #%d, cost %d ms", objArr));
                    if (z) {
                        return;
                    }
                    ZookeeperConsumerConnector.this.logger.warn("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered");
                    closeFetchersForQueues(cluster, this.messagesStreams, ZookeeperConsumerConnector.this.queues.values());
                    try {
                        Thread.sleep(ZookeeperConsumerConnector.this.config.getRebalanceBackoffMs());
                    } catch (InterruptedException e2) {
                        ZookeeperConsumerConnector.this.logger.warn(e2.getMessage());
                    }
                }
                throw new ConsumerRebalanceFailedException(this.consumerIdString + " can't rebalance after " + ZookeeperConsumerConnector.this.config.getMaxRebalanceRetries() + " retries");
            }
        }

        private boolean rebalance(Cluster cluster) {
            Map<String, Set<String>> consumerThreadIdsPerTopic = ZkUtils.getTopicCount(ZookeeperConsumerConnector.this.zkClient, this.group, this.consumerIdString).getConsumerThreadIdsPerTopic();
            Map<String, List<String>> consumersPerTopic = ZkUtils.getConsumersPerTopic(ZookeeperConsumerConnector.this.zkClient, this.group);
            Map<String, List<String>> partitionsForTopics = ZkUtils.getPartitionsForTopics(ZookeeperConsumerConnector.this.zkClient, consumerThreadIdsPerTopic.keySet());
            closeFetchers(cluster, this.messagesStreams, consumerThreadIdsPerTopic);
            releasePartitionOwnership(ZookeeperConsumerConnector.this.topicRegistry);
            HashMap hashMap = new HashMap();
            Pool<String, Pool<Partition, PartitionTopicInfo>> pool = new Pool<>();
            for (Map.Entry<String, Set<String>> entry : consumerThreadIdsPerTopic.entrySet()) {
                String key = entry.getKey();
                pool.put((Pool<String, Pool<Partition, PartitionTopicInfo>>) key, (String) new Pool<>());
                ZkGroupTopicDirs zkGroupTopicDirs = new ZkGroupTopicDirs(this.group, key);
                List<String> list = consumersPerTopic.get(key);
                List<String> list2 = partitionsForTopics.get(key);
                int size = list2.size() / list.size();
                int size2 = list2.size() % list.size();
                ZookeeperConsumerConnector.this.logger.info("Consumer {} rebalancing the following {} partitions of topic {}:\n\t{}\n\twith {} consumers:\n\t{}", new Object[]{this.consumerIdString, Integer.valueOf(list2.size()), key, list2, Integer.valueOf(list.size()), list});
                if (ZookeeperConsumerConnector.this.logger.isDebugEnabled()) {
                    StringBuilder sb = new StringBuilder(1024);
                    sb.append("[").append(key).append("] preassigning details:");
                    for (int i = 0; i < list.size(); i++) {
                        int min = (size * i) + Math.min(i, size2);
                        int i2 = size + (i + 1 > size2 ? 0 : 1);
                        if (i2 > 0) {
                            for (int i3 = min; i3 < min + i2; i3++) {
                                sb.append("\n    ").append(list.get(i)).append(" ==> ").append(list2.get(i3));
                            }
                        }
                    }
                    ZookeeperConsumerConnector.this.logger.debug(sb.toString());
                }
                for (String str : entry.getValue()) {
                    int indexOf = list.indexOf(str);
                    if (!$assertionsDisabled && indexOf < 0) {
                        throw new AssertionError();
                    }
                    int min2 = (size * indexOf) + Math.min(indexOf, size2);
                    int i4 = size + (indexOf + 1 > size2 ? 0 : 1);
                    if (i4 <= 0) {
                        ZookeeperConsumerConnector.this.logger.warn("No broker partition of topic {} for consumer {}, {} partitions and {} consumers", new Object[]{key, str, Integer.valueOf(list2.size()), Integer.valueOf(list.size())});
                    } else {
                        for (int i5 = min2; i5 < min2 + i4; i5++) {
                            String str2 = list2.get(i5);
                            ZookeeperConsumerConnector.this.logger.info("[" + str + "] ==> " + str2 + " claimming");
                            addPartitionTopicInfo(pool, zkGroupTopicDirs, str2, key, str);
                            hashMap.put(new KV.StringTuple(key, str2), str);
                        }
                    }
                }
            }
            if (!reflectPartitionOwnershipDecision(hashMap)) {
                return false;
            }
            ZookeeperConsumerConnector.this.logger.debug("Updating the cache");
            ZookeeperConsumerConnector.this.logger.debug("Partitions per topic cache " + partitionsForTopics);
            ZookeeperConsumerConnector.this.logger.debug("Consumers per topic cache " + consumersPerTopic);
            ZookeeperConsumerConnector.this.topicRegistry = pool;
            updateFetcher(cluster, this.messagesStreams);
            return true;
        }

        private void updateFetcher(Cluster cluster, Map<String, List<MessageStream<T>>> map) {
            if (ZookeeperConsumerConnector.this.fetcher != null) {
                ArrayList arrayList = new ArrayList();
                Iterator it = ZookeeperConsumerConnector.this.topicRegistry.values().iterator();
                while (it.hasNext()) {
                    arrayList.addAll(((Pool) it.next()).values());
                }
                ZookeeperConsumerConnector.this.fetcher.startConnections(arrayList, cluster, map);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private boolean reflectPartitionOwnershipDecision(Map<KV.StringTuple, String> map) {
            ArrayList<KV.StringTuple> arrayList = new ArrayList();
            int i = 0;
            for (Map.Entry<KV.StringTuple, String> entry : map.entrySet()) {
                String str = (String) entry.getKey().k;
                String str2 = (String) entry.getKey().v;
                String value = entry.getValue();
                String str3 = new ZkGroupTopicDirs(this.group, str).consumerOwnerDir + "/" + str2;
                try {
                    ZkUtils.createEphemeralPathExpectConflict(ZookeeperConsumerConnector.this.zkClient, str3, value);
                    arrayList.add(new KV.StringTuple(str, str2));
                } catch (ZkNodeExistsException e) {
                    ZookeeperConsumerConnector.this.logger.warn(String.format("[%s] waiting [%s] to release => %s", value, ZkUtils.readDataMaybeNull(ZookeeperConsumerConnector.this.zkClient, str3), str2));
                    i++;
                }
            }
            if (i <= 0) {
                return true;
            }
            for (KV.StringTuple stringTuple : arrayList) {
                deletePartitionOwnershipFromZK((String) stringTuple.k, (String) stringTuple.v);
            }
            return false;
        }

        private void addPartitionTopicInfo(Pool<String, Pool<Partition, PartitionTopicInfo>> pool, ZkGroupTopicDirs zkGroupTopicDirs, String str, String str2, String str3) {
            long parseLong;
            Partition parse = Partition.parse(str);
            Pool<Partition, PartitionTopicInfo> pool2 = pool.get(str2);
            String readDataMaybeNull = ZkUtils.readDataMaybeNull(ZookeeperConsumerConnector.this.zkClient, zkGroupTopicDirs.consumerOffsetDir + "/" + parse.getName());
            if (readDataMaybeNull != null) {
                parseLong = Long.parseLong(readDataMaybeNull);
            } else if (OffsetRequest.SMALLES_TIME_STRING.equals(ZookeeperConsumerConnector.this.config.getAutoOffsetReset())) {
                parseLong = earliestOrLatestOffset(str2, parse.brokerId, parse.partId, -2L);
            } else {
                if (!OffsetRequest.LARGEST_TIME_STRING.equals(ZookeeperConsumerConnector.this.config.getAutoOffsetReset())) {
                    throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig");
                }
                parseLong = earliestOrLatestOffset(str2, parse.brokerId, parse.partId, -1L);
            }
            PartitionTopicInfo partitionTopicInfo = new PartitionTopicInfo(str2, parse, (BlockingQueue) ZookeeperConsumerConnector.this.queues.get(new KV.StringTuple(str2, str3)), new AtomicLong(parseLong), new AtomicLong(parseLong));
            pool2.put((Pool<Partition, PartitionTopicInfo>) parse, (Partition) partitionTopicInfo);
            ZookeeperConsumerConnector.this.logger.debug(partitionTopicInfo + " selected new offset " + parseLong);
        }

        private long earliestOrLatestOffset(String str, int i, int i2, long j) {
            Broker broker;
            long j2 = -1;
            try {
                try {
                    broker = ZkUtils.getCluster(ZookeeperConsumerConnector.this.zkClient).getBroker(Integer.valueOf(i));
                } catch (Exception e) {
                    ZookeeperConsumerConnector.this.logger.error("error in earliestOrLatestOffset() ", e);
                    if (0 != 0) {
                        Closer.closeQuietly((Closeable) null);
                    }
                }
                if (broker == null) {
                    throw new IllegalStateException("Broker " + i + " is unavailable. Cannot issue getOffsetsBefore request");
                }
                SimpleConsumer simpleConsumer = new SimpleConsumer(broker.host, broker.port, ZookeeperConsumerConnector.this.config.getSocketTimeoutMs(), ZookeeperConsumerConnector.this.config.getSocketBufferSize());
                long[] offsetsBefore = simpleConsumer.getOffsetsBefore(str, i2, j, 1);
                if (offsetsBefore.length > 0) {
                    j2 = offsetsBefore[0];
                }
                if (simpleConsumer != null) {
                    Closer.closeQuietly(simpleConsumer);
                }
                return j2;
            } catch (Throwable th) {
                if (0 != 0) {
                    Closer.closeQuietly((Closeable) null);
                }
                throw th;
            }
        }

        private void releasePartitionOwnership(Pool<String, Pool<Partition, PartitionTopicInfo>> pool) {
            ZookeeperConsumerConnector.this.logger.info("Releasing partition ownership => " + pool.values());
            for (Map.Entry<String, Pool<Partition, PartitionTopicInfo>> entry : pool.entrySet()) {
                Iterator<Partition> it = entry.getValue().keySet().iterator();
                while (it.hasNext()) {
                    deletePartitionOwnershipFromZK(entry.getKey(), it.next());
                }
            }
            pool.clear();
        }

        private void deletePartitionOwnershipFromZK(String str, String str2) {
            String str3 = new ZkGroupTopicDirs(this.group, str).consumerOwnerDir + "/" + str2;
            ZkUtils.deletePath(ZookeeperConsumerConnector.this.zkClient, str3);
            ZookeeperConsumerConnector.this.logger.debug("Consumer [" + this.consumerIdString + "] released " + str3);
        }

        private void deletePartitionOwnershipFromZK(String str, Partition partition) {
            deletePartitionOwnershipFromZK(str, partition.toString());
        }

        private void closeFetchers(Cluster cluster, Map<String, List<MessageStream<T>>> map, Map<String, Set<String>> map2) {
            ArrayList arrayList = new ArrayList();
            for (Map.Entry entry : ZookeeperConsumerConnector.this.queues.entrySet()) {
                if (map2.containsKey(((KV.StringTuple) entry.getKey()).k)) {
                    arrayList.add(entry.getValue());
                }
            }
            closeFetchersForQueues(cluster, map, arrayList);
        }

        private void closeFetchersForQueues(Cluster cluster, Map<String, List<MessageStream<T>>> map, Collection<BlockingQueue<FetchedDataChunk>> collection) {
            if (ZookeeperConsumerConnector.this.fetcher == null) {
                return;
            }
            ZookeeperConsumerConnector.this.fetcher.stopConnectionsToAllBrokers();
            ZookeeperConsumerConnector.this.fetcher.clearFetcherQueues(collection, map.values());
            if (ZookeeperConsumerConnector.this.config.isAutoCommit()) {
                ZookeeperConsumerConnector.this.logger.info("Committing all offsets after clearing the fetcher queues");
                ZookeeperConsumerConnector.this.commitOffsets();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void resetState() {
            ZookeeperConsumerConnector.this.topicRegistry.clear();
        }

        static {
            $assertionsDisabled = !ZookeeperConsumerConnector.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/sohu/jafka/consumer/ZookeeperConsumerConnector$ZKSessionExpireListener.class */
    public class ZKSessionExpireListener<T> implements IZkStateListener {
        private final ZkGroupDirs zkGroupDirs;
        private String consumerIdString;
        private TopicCount topicCount;
        private ZKRebalancerListener<T> loadRebalancerListener;

        public ZKSessionExpireListener(ZkGroupDirs zkGroupDirs, String str, TopicCount topicCount, ZKRebalancerListener<T> zKRebalancerListener) {
            this.zkGroupDirs = zkGroupDirs;
            this.consumerIdString = str;
            this.topicCount = topicCount;
            this.loadRebalancerListener = zKRebalancerListener;
        }

        public void handleNewSession() throws Exception {
            ZookeeperConsumerConnector.this.logger.info("Zk expired; release old broker partition ownership; re-register consumer " + this.consumerIdString);
            this.loadRebalancerListener.resetState();
            ZookeeperConsumerConnector.this.registerConsumerInZK(this.zkGroupDirs, this.consumerIdString, this.topicCount);
            this.loadRebalancerListener.syncedRebalance();
        }

        public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
        }
    }

    public ZookeeperConsumerConnector(ConsumerConfig consumerConfig) {
        this(consumerConfig, true);
    }

    public ZookeeperConsumerConnector(ConsumerConfig consumerConfig, boolean z) {
        this.logger = LoggerFactory.getLogger(ZookeeperConsumerConnector.class);
        this.isShuttingDown = new AtomicBoolean(false);
        this.rebalanceLock = new Object();
        this.scheduler = new Scheduler(1, "consumer-autocommit-", false);
        this.rebalancerListeners = new ArrayList();
        this.config = consumerConfig;
        this.enableFetcher = z;
        this.topicRegistry = new Pool<>();
        this.queues = new Pool<>();
        connectZk();
        createFetcher();
        if (this.config.isAutoCommit()) {
            this.logger.info("starting auto committer every " + consumerConfig.getAutoCommitIntervalMs() + " ms");
            this.scheduler.scheduleWithRate(new AutoCommitTask(), consumerConfig.getAutoCommitIntervalMs(), consumerConfig.getAutoCommitIntervalMs());
        }
    }

    private void createFetcher() {
        if (this.enableFetcher) {
            this.fetcher = new Fetcher(this.config, this.zkClient);
        }
    }

    @Override // com.sohu.jafka.consumer.ConsumerConnector
    public <T> Map<String, List<MessageStream<T>>> createMessageStreams(Map<String, Integer> map, Decoder<T> decoder) {
        return consume(map, decoder);
    }

    private <T> Map<String, List<MessageStream<T>>> consume(Map<String, Integer> map, Decoder<T> decoder) {
        if (map == null) {
            throw new IllegalArgumentException("topicCountMap is null");
        }
        ZkGroupDirs zkGroupDirs = new ZkGroupDirs(this.config.getGroupId());
        HashMap hashMap = new HashMap();
        String consumerId = this.config.getConsumerId();
        if (consumerId == null) {
            consumerId = generateConsumerId();
        }
        this.logger.info(String.format("create message stream by consumerid [%s] with groupid [%s]", consumerId, this.config.getGroupId()));
        String str = this.config.getGroupId() + "_" + consumerId;
        TopicCount topicCount = new TopicCount(str, map);
        for (Map.Entry<String, Set<String>> entry : topicCount.getConsumerThreadIdsPerTopic().entrySet()) {
            String key = entry.getKey();
            Set<String> value = entry.getValue();
            ArrayList arrayList = new ArrayList();
            for (String str2 : value) {
                LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.config.getMaxQueuedChunks());
                this.queues.put((Pool<KV.StringTuple, BlockingQueue<FetchedDataChunk>>) new KV.StringTuple(key, str2), (KV.StringTuple) linkedBlockingQueue);
                arrayList.add(new MessageStream(key, linkedBlockingQueue, this.config.getConsumerTimeoutMs(), decoder));
            }
            hashMap.put(key, arrayList);
            this.logger.debug("adding topic " + key + " and stream to map.");
        }
        ZKRebalancerListener<?> zKRebalancerListener = new ZKRebalancerListener<>(this.config.getGroupId(), str, hashMap);
        this.rebalancerListeners.add(zKRebalancerListener);
        zKRebalancerListener.start();
        registerConsumerInZK(zkGroupDirs, str, topicCount);
        this.zkClient.subscribeStateChanges(new ZKSessionExpireListener(zkGroupDirs, str, topicCount, zKRebalancerListener));
        this.zkClient.subscribeChildChanges(zkGroupDirs.consumerRegistryDir, zKRebalancerListener);
        Iterator it = hashMap.keySet().iterator();
        while (it.hasNext()) {
            this.zkClient.subscribeChildChanges("/brokers/topics/" + ((String) it.next()), zKRebalancerListener);
        }
        zKRebalancerListener.syncedRebalance();
        return hashMap;
    }

    private String getLocalHost() throws UnknownHostException {
        try {
            Iterator it = Collections.list(NetworkInterface.getNetworkInterfaces()).iterator();
            while (it.hasNext()) {
                NetworkInterface networkInterface = (NetworkInterface) it.next();
                if (!networkInterface.isLoopback()) {
                    Iterator it2 = Collections.list(networkInterface.getInetAddresses()).iterator();
                    while (it2.hasNext()) {
                        InetAddress inetAddress = (InetAddress) it2.next();
                        if (inetAddress instanceof Inet4Address) {
                            return inetAddress.getHostAddress();
                        }
                    }
                }
            }
        } catch (Exception e) {
        }
        throw new UnknownHostException();
    }

    private String generateConsumerId() {
        UUID randomUUID = UUID.randomUUID();
        try {
            return String.format("%s-%d-%s", InetAddress.getLocalHost().getHostName(), Long.valueOf(System.currentTimeMillis()), Long.toHexString(randomUUID.getMostSignificantBits()).substring(0, 8));
        } catch (UnknownHostException e) {
            try {
                return String.format("%s-%d-%s", InetAddress.getLocalHost().getHostAddress(), Long.valueOf(System.currentTimeMillis()), Long.toHexString(randomUUID.getMostSignificantBits()).substring(0, 8));
            } catch (UnknownHostException e2) {
                throw new IllegalArgumentException("can not generate consume id by auto, set the 'consumerid' parameter to fix this");
            }
        }
    }

    @Override // com.sohu.jafka.consumer.ConsumerConnector
    public void commitOffsets() {
        if (this.zkClient == null) {
            this.logger.error("zk client is null. Cannot commit offsets");
            return;
        }
        for (Map.Entry<String, Pool<Partition, PartitionTopicInfo>> entry : this.topicRegistry.entrySet()) {
            ZkGroupTopicDirs zkGroupTopicDirs = new ZkGroupTopicDirs(this.config.getGroupId(), entry.getKey());
            for (PartitionTopicInfo partitionTopicInfo : entry.getValue().values()) {
                long j = partitionTopicInfo.getConsumedOffsetChanged().get();
                if (j == 0) {
                    this.logger.trace("consume offset not changed");
                } else {
                    long consumedOffset = partitionTopicInfo.getConsumedOffset();
                    String str = zkGroupTopicDirs.consumerOffsetDir + "/" + partitionTopicInfo.partition.getName();
                    try {
                        try {
                            ZkUtils.updatePersistentPath(this.zkClient, str, "" + consumedOffset);
                            partitionTopicInfo.resetComsumedOffsetChanged(j);
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Committed [" + str + "] for topic " + partitionTopicInfo);
                            }
                        } catch (Throwable th) {
                            this.logger.warn("exception during commitOffsets, path=" + str + ",offset=" + consumedOffset, th);
                            partitionTopicInfo.resetComsumedOffsetChanged(j);
                            if (this.logger.isDebugEnabled()) {
                                this.logger.debug("Committed [" + str + "] for topic " + partitionTopicInfo);
                            }
                        }
                    } catch (Throwable th2) {
                        partitionTopicInfo.resetComsumedOffsetChanged(j);
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Committed [" + str + "] for topic " + partitionTopicInfo);
                        }
                        throw th2;
                    }
                }
            }
        }
    }

    @Override // com.sohu.jafka.consumer.ConsumerConnector, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isShuttingDown.compareAndSet(false, true)) {
            this.logger.info("ZkConsumerConnector shutting down");
            try {
                this.scheduler.shutdown();
                if (this.fetcher != null) {
                    this.fetcher.stopConnectionsToAllBrokers();
                }
                sendShutdownToAllQueues();
                if (this.config.isAutoCommit()) {
                    commitOffsets();
                }
                Iterator<ZKRebalancerListener<?>> it = this.rebalancerListeners.iterator();
                while (it.hasNext()) {
                    Closer.closeQuietly(it.next());
                }
                if (this.zkClient != null) {
                    this.zkClient.close();
                    this.zkClient = null;
                }
            } catch (Exception e) {
                this.logger.error("error during consumer connector shutdown", e);
            }
            this.logger.info("ZkConsumerConnector shut down completed");
        }
    }

    private void sendShutdownToAllQueues() {
        for (BlockingQueue<FetchedDataChunk> blockingQueue : this.queues.values()) {
            blockingQueue.clear();
            try {
                blockingQueue.put(SHUTDOWN_COMMAND);
            } catch (InterruptedException e) {
                this.logger.warn(e.getMessage(), e);
            }
        }
    }

    private void connectZk() {
        this.logger.info("Connecting to zookeeper instance at " + this.config.getZkConnect());
        this.zkClient = new ZkClient(this.config.getZkConnect(), this.config.getZkSessionTimeoutMs(), this.config.getZkConnectionTimeoutMs());
        this.logger.info("Connected to zookeeper at " + this.config.getZkConnect());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerConsumerInZK(ZkGroupDirs zkGroupDirs, String str, TopicCount topicCount) {
        String str2 = zkGroupDirs.consumerRegistryDir + "/" + str;
        String jsonString = topicCount.toJsonString();
        this.logger.info(String.format("register consumer in zookeeper [%s] => [%s]", str2, jsonString));
        ZkUtils.createEphemeralPathExpectConflict(this.zkClient, str2, jsonString);
    }
}
