/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.tmq;

import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.common.ConsumerManager;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.MapDeserializer;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.TopicPartition;
import com.taosdata.jdbc.utils.StringUtils;
import com.taosdata.jdbc.utils.Utils;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class TaosConsumer<V>
implements AutoCloseable {
    private static final long NO_CURRENT_THREAD = -1L;
    private final AtomicLong currentThread = new AtomicLong(-1L);
    private final AtomicInteger refcount = new AtomicInteger(0);
    private volatile boolean closed = false;
    private final Consumer<V> consumer;
    private final Deserializer<V> deserializer;

    public TaosConsumer(Properties properties) throws SQLException {
        String s2;
        if (null == properties) {
            throw TSDBError.createSQLException(9073, "consumer properties must not be null!");
        }
        String servers = properties.getProperty("bootstrap.servers");
        if (!StringUtils.isEmpty(servers)) {
            Arrays.stream(servers.split(",")).filter(s -> !StringUtils.isEmpty(s)).findFirst().ifPresent(s -> {
                String[] host = s.split(":");
                properties.setProperty("td.connect.ip", host[0]);
                if (host.length > 1) {
                    properties.setProperty("td.connect.port", host[1]);
                }
            });
        }
        this.deserializer = !StringUtils.isEmpty(s2 = properties.getProperty("value.deserializer")) ? (Deserializer)Utils.newInstance(Utils.parseClassType(s2)) : new MapDeserializer();
        this.deserializer.configure(properties);
        String type = properties.getProperty("td.connect.type");
        this.consumer = ConsumerManager.getConsumer(type);
        this.consumer.create(properties);
    }

    public void subscribe(Collection<String> topics) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            this.consumer.subscribe(topics);
        }
        finally {
            this.release();
        }
    }

    public void unsubscribe() throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            this.consumer.unsubscribe();
        }
        finally {
            this.release();
        }
    }

    public Set<String> subscription() throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            Set<String> set = this.consumer.subscription();
            return set;
        }
        finally {
            this.release();
        }
    }

    public ConsumerRecords<V> poll(Duration timeout) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            ConsumerRecords<V> consumerRecords = this.consumer.poll(timeout, this.deserializer);
            return consumerRecords;
        }
        finally {
            this.release();
        }
    }

    @Deprecated
    public void commitCallbackHandler(int code) {
    }

    public void commitAsync() {
        this.consumer.commitAsync((r, e) -> {});
    }

    public void commitAsync(OffsetCommitCallback<V> callback) {
        this.consumer.commitAsync(callback);
    }

    public void commitSync() throws SQLException {
        this.consumer.commitSync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void seek(TopicPartition partition, long offset) throws SQLException {
        if (offset < 0L) {
            throw TSDBError.createIllegalArgumentException(9081);
        }
        this.acquireAndEnsureOpen();
        try {
            this.consumer.seek(partition, offset);
        }
        finally {
            this.release();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public long position(TopicPartition tp) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            long l = this.consumer.position(tp);
            return l;
        }
        finally {
            this.release();
        }
    }

    public Map<TopicPartition, Long> position(String topic) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.consumer.position(topic);
            return map;
        }
        finally {
            this.release();
        }
    }

    public Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.consumer.beginningOffsets(topic);
            return map;
        }
        finally {
            this.release();
        }
    }

    public Map<TopicPartition, Long> endOffsets(String topic) throws SQLException {
        this.acquireAndEnsureOpen();
        try {
            Map<TopicPartition, Long> map = this.consumer.endOffsets(topic);
            return map;
        }
        finally {
            this.release();
        }
    }

    private void acquireAndEnsureOpen() {
        this.acquire();
        if (this.closed) {
            this.release();
            throw new IllegalStateException("This consumer has already been closed.");
        }
    }

    private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("Consumer is not safe for multi-threaded access");
        }
        this.refcount.incrementAndGet();
    }

    private void release() {
        if (this.refcount.decrementAndGet() == 0) {
            this.currentThread.set(-1L);
        }
    }

    @Override
    public void close() throws SQLException {
        this.acquire();
        try {
            this.consumer.close();
        }
        finally {
            this.closed = true;
            this.release();
        }
    }
}

