/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.drc.clusterclient;

import com.aliyun.drc.clusterclient.impl.DrcClientListener;
import com.aliyun.drc.clusterclient.message.ClusterMessage;
import com.aliyun.drc.clusterclient.partition.Partition;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ClusterListener
extends Thread {
    private static final Logger logger = LoggerFactory.getLogger(ClusterListener.class);
    private static final long MAX_OCCUPY_TIME_IN_MS = 2000L;
    private static final long DEFAULT_WAIT_TIME_IN_MS = 10L;
    private volatile boolean exited = false;
    private long beginTime = 0L;
    private AtomicInteger count = new AtomicInteger(0);
    private Partition lastNotifiedPartition;
    private List<Partition> listenedPartitions = new ArrayList<Partition>();
    private List<DrcClientListener> listeners = new ArrayList<DrcClientListener>();
    private BlockingQueue<List<ClusterMessage>> msgQueue = new LinkedBlockingQueue<List<ClusterMessage>>(4096);

    public abstract void notify(List<ClusterMessage> var1) throws Exception;

    public abstract void noException(Exception var1);

    @Override
    public void run() {
        this.setName("Notify-Message-Thread");
        while (!this.exited) {
            try {
                List<ClusterMessage> messages = this.msgQueue.take();
                this.notify(messages);
            }
            catch (InterruptedException e) {
                logger.warn("notify messages thread interrupted...");
                this.noException(e);
            }
            catch (Exception e) {
                logger.error("notify messages thread exception: ", (Throwable)e);
                this.noException(e);
            }
        }
    }

    public synchronized long notifyMessages(Partition partition, List<ClusterMessage> messages) {
        try {
            this.msgQueue.put(messages);
        }
        catch (InterruptedException e) {
            logger.warn("put messages into msgQueue interrupted...");
        }
        catch (Exception e) {
            logger.error("put messages into msgQueue exception: ", (Throwable)e);
        }
        if (this.count.longValue() <= 1L) {
            return 0L;
        }
        if (this.beginTime <= 0L || this.lastNotifiedPartition == null || this.lastNotifiedPartition != partition) {
            this.lastNotifiedPartition = partition;
            this.beginTime = System.currentTimeMillis();
            return 0L;
        }
        long currTime = System.currentTimeMillis();
        if (currTime - this.beginTime >= 2000L) {
            this.beginTime = 0L;
            return 10L;
        }
        return 0L;
    }

    @Deprecated
    public synchronized void notifyWithoutHeartbeat(List<ClusterMessage> messages) throws Exception {
        if (this.beginTime == 0L) {
            this.beginTime = System.currentTimeMillis();
        }
        try {
            this.notify(messages);
        }
        catch (Exception e) {
            this.noException(e);
        }
    }

    @Deprecated
    public long setNotifiedPartition(Partition partition) {
        if (this.count.longValue() <= 1L) {
            this.beginTime = -1L;
            return 0L;
        }
        if (this.beginTime <= 0L || this.lastNotifiedPartition == null || this.lastNotifiedPartition != partition) {
            this.lastNotifiedPartition = partition;
            this.beginTime = 0L;
            return 0L;
        }
        long currTime = System.currentTimeMillis();
        if (currTime - this.beginTime >= 2000L) {
            this.beginTime = 0L;
            return 10L;
        }
        return 0L;
    }

    public synchronized void addListenedPartition(DrcClientListener listener, Partition partition) {
        this.listeners.add(listener);
        this.listenedPartitions.add(partition);
        this.count.incrementAndGet();
    }

    public synchronized void removeListenedPartition(DrcClientListener listener, Partition partition) {
        this.listeners.remove(listener);
        this.listenedPartitions.remove(partition);
        this.count.decrementAndGet();
    }

    public void shutdown() throws Exception {
        this.exited = true;
        this.interrupt();
        this.join();
        logger.info("cluster listener has been shutdown...");
    }

    public int getMessageQueueSize() {
        return this.msgQueue.size();
    }
}

