/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.ws.tmq;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBError;
import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.enums.TmqMessageType;
import com.taosdata.jdbc.enums.WSFunction;
import com.taosdata.jdbc.tmq.Assignment;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.OffsetAndMetadata;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.TopicPartition;
import com.taosdata.jdbc.ws.FutureResponse;
import com.taosdata.jdbc.ws.InFlightRequest;
import com.taosdata.jdbc.ws.Transport;
import com.taosdata.jdbc.ws.entity.Code;
import com.taosdata.jdbc.ws.entity.FetchBlockResp;
import com.taosdata.jdbc.ws.entity.Request;
import com.taosdata.jdbc.ws.entity.Response;
import com.taosdata.jdbc.ws.tmq.ConsumerAction;
import com.taosdata.jdbc.ws.tmq.WSConsumerResultSet;
import com.taosdata.jdbc.ws.tmq.entity.AssignmentResp;
import com.taosdata.jdbc.ws.tmq.entity.CommitOffsetResp;
import com.taosdata.jdbc.ws.tmq.entity.CommitResp;
import com.taosdata.jdbc.ws.tmq.entity.CommittedResp;
import com.taosdata.jdbc.ws.tmq.entity.ConsumerParam;
import com.taosdata.jdbc.ws.tmq.entity.ListTopicsResp;
import com.taosdata.jdbc.ws.tmq.entity.PollResp;
import com.taosdata.jdbc.ws.tmq.entity.PositionResp;
import com.taosdata.jdbc.ws.tmq.entity.SeekResp;
import com.taosdata.jdbc.ws.tmq.entity.SubscribeResp;
import com.taosdata.jdbc.ws.tmq.entity.TMQRequestFactory;
import com.taosdata.jdbc.ws.tmq.entity.UnsubscribeResp;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class WSConsumer<V>
implements Consumer<V> {
    private Transport transport;
    private ConsumerParam param;
    private TMQRequestFactory factory;
    private long lastCommitTime = 0L;
    private long messageId = 0L;
    private Collection<String> topics;

    @Override
    public void create(Properties properties) throws SQLException {
        this.factory = new TMQRequestFactory();
        this.param = new ConsumerParam(properties);
        InFlightRequest inFlightRequest = new InFlightRequest(this.param.getConnectionParam().getRequestTimeout(), this.param.getConnectionParam().getMaxRequest());
        this.transport = new Transport(WSFunction.TMQ, this.param.getConnectionParam(), inFlightRequest);
        this.transport.setTextMessageHandler(message -> {
            ConsumerAction action;
            JSONObject jsonObject = JSON.parseObject((String)message);
            Response response = (Response)jsonObject.toJavaObject((action = ConsumerAction.of(jsonObject.getString("action"))).getResponseClazz());
            FutureResponse remove = inFlightRequest.remove(response.getAction(), response.getReqId());
            if (null != remove) {
                remove.getFuture().complete(response);
            }
        });
        this.transport.setBinaryMessageHandler(byteBuffer -> {
            byteBuffer.order(ByteOrder.LITTLE_ENDIAN);
            byteBuffer.position(8);
            long id = byteBuffer.getLong();
            byteBuffer.position(24);
            FutureResponse remove = inFlightRequest.remove(ConsumerAction.FETCH_BLOCK.getAction(), id);
            if (null != remove) {
                FetchBlockResp fetchBlockResp = new FetchBlockResp(id, (ByteBuffer)byteBuffer);
                remove.getFuture().complete(fetchBlockResp);
            }
        });
        this.transport.checkConnection(this.param.getConnectionParam().getConnectTimeout());
    }

    @Override
    public void subscribe(Collection<String> topics) throws SQLException {
        Request request = this.factory.generateSubscribe(this.param.getConnectionParam().getUser(), this.param.getConnectionParam().getPassword(), this.param.getConnectionParam().getDatabase(), this.param.getGroupId(), this.param.getClientId(), this.param.getOffsetRest(), topics.toArray(new String[0]), String.valueOf(false), this.param.getMsgWithTableName());
        SubscribeResp response = (SubscribeResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != response.getCode()) {
            throw new SQLException("subscribe topic error, code: (0x" + Integer.toHexString(response.getCode()) + "), message: " + response.getMessage());
        }
        this.topics = topics;
    }

    @Override
    public void unsubscribe() throws SQLException {
        Request request = this.factory.generateUnsubscribe();
        UnsubscribeResp response = (UnsubscribeResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != response.getCode()) {
            throw new SQLException("unsubscribe topic error, code: (0x" + Integer.toHexString(response.getCode()) + "), message: " + response.getMessage() + ", timing: " + response.getTiming());
        }
    }

    @Override
    public Set<String> subscription() throws SQLException {
        Request request = this.factory.generateSubscription();
        ListTopicsResp response = (ListTopicsResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != response.getCode()) {
            throw new SQLException("get subscription error, code: (0x" + Integer.toHexString(response.getCode()) + "), message: " + response.getMessage());
        }
        return Arrays.stream(response.getTopics()).collect(Collectors.toSet());
    }

    private boolean handleReconnect() throws SQLException {
        if (this.transport.doReconnectCurNode()) {
            this.subscribe(this.topics);
            return true;
        }
        this.transport.close();
        return false;
    }

    private ConsumerRecords<V> doPoll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
        long now;
        if (this.param.isAutoCommit() && 0L != this.messageId && (now = System.currentTimeMillis()) - this.lastCommitTime > this.param.getAutoCommitInterval()) {
            this.commitSync();
            this.lastCommitTime = now;
        }
        Request request = this.factory.generatePoll(timeout.toMillis());
        PollResp pollResp = (PollResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != pollResp.getCode()) {
            throw new SQLException("consumer poll error, code: (0x" + Integer.toHexString(pollResp.getCode()) + "), message: " + pollResp.getMessage());
        }
        if (!pollResp.isHaveMessage()) {
            return ConsumerRecords.emptyRecord();
        }
        if (pollResp.getMessageType() != TmqMessageType.TMQ_RES_DATA.getCode()) {
            return ConsumerRecords.emptyRecord();
        }
        this.messageId = pollResp.getMessageId();
        ConsumerRecords<V> records = new ConsumerRecords<V>();
        try (WSConsumerResultSet rs = new WSConsumerResultSet(this.transport, this.factory, pollResp.getMessageId(), pollResp.getDatabase());){
            while (rs.next()) {
                String topic = pollResp.getTopic();
                String dbName = pollResp.getDatabase();
                int vGroupId = pollResp.getVgroupId();
                TopicPartition tp = new TopicPartition(topic, vGroupId);
                V v = deserializer.deserialize(rs, topic, dbName);
                ConsumerRecord<V> r = new ConsumerRecord<V>(topic, dbName, vGroupId, pollResp.getOffset(), v);
                records.put(tp, r);
            }
        }
        return records;
    }

    @Override
    public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
        try {
            return this.doPoll(timeout, deserializer);
        }
        catch (SQLException e) {
            if (e.getErrorCode() == 8961 && !this.transport.isClosed() && this.param.getConnectionParam().isEnableAutoConnect() && this.handleReconnect()) {
                this.messageId = 0L;
                return ConsumerRecords.emptyRecord();
            }
            if (e.getErrorCode() == 8990 && !this.transport.isClosed() && this.transport.isConnectionLost() && this.handleReconnect()) {
                this.messageId = 0L;
                return ConsumerRecords.emptyRecord();
            }
            throw e;
        }
    }

    @Override
    public synchronized void commitSync() throws SQLException {
        if (0L != this.messageId) {
            CommitResp commitResp = (CommitResp)this.transport.send(this.factory.generateCommit(this.messageId));
            if (Code.SUCCESS.getCode() != commitResp.getCode()) {
                throw new SQLException("consumer commit error. code: (0x" + Integer.toHexString(commitResp.getCode()) + "), message: " + commitResp.getMessage());
            }
            this.messageId = 0L;
        }
    }

    @Override
    public void close() throws SQLException {
        this.transport.close();
    }

    @Override
    public void commitAsync(OffsetCommitCallback<V> callback) {
    }

    @Override
    public void seek(TopicPartition partition, long offset) throws SQLException {
        Request request = this.factory.generateSeek(partition.getTopic(), partition.getVGroupId(), offset);
        SeekResp resp = (SeekResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer seek error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
    }

    @Override
    public long position(TopicPartition partition) throws SQLException {
        Request request = this.factory.generatePosition(new TopicPartition[]{partition});
        PositionResp resp = (PositionResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer position error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
        return resp.getPosition()[0];
    }

    @Override
    public Map<TopicPartition, Long> position(String topic) throws SQLException {
        TopicPartition[] topicPartitions = (TopicPartition[])Arrays.stream(this.getAssignment(topic)).map(a -> new TopicPartition(topic, a.getVgId())).toArray(TopicPartition[]::new);
        Request request = this.factory.generatePosition(topicPartitions);
        PositionResp resp = (PositionResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer position error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
        return Arrays.stream(topicPartitions).collect(Collectors.toMap(tp -> tp, tp -> resp.getPosition()[Arrays.asList(topicPartitions).indexOf(tp)]));
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(String topic) throws SQLException {
        return Arrays.stream(this.getAssignment(topic)).collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getBegin()), HashMap::putAll);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(String topic) throws SQLException {
        return Arrays.stream(this.getAssignment(topic)).collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getEnd()), HashMap::putAll);
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException {
        HashMap<TopicPartition, Long> beginningOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : partitions) {
            if (beginningOffsets.containsKey(partition)) {
                Long aLong = (Long)beginningOffsets.get(partition);
                this.seek(partition, aLong);
                continue;
            }
            Map<TopicPartition, Long> map = this.beginningOffsets(partition.getTopic());
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                if (entry.getKey().getVGroupId() == partition.getVGroupId()) {
                    this.seek(entry.getKey(), entry.getValue());
                    continue;
                }
                beginningOffsets.put(entry.getKey(), entry.getValue());
            }
        }
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) throws SQLException {
        HashMap<TopicPartition, Long> endOffsets = new HashMap<TopicPartition, Long>();
        for (TopicPartition partition : partitions) {
            if (endOffsets.containsKey(partition)) {
                Long aLong = (Long)endOffsets.get(partition);
                this.seek(partition, aLong);
                continue;
            }
            Map<TopicPartition, Long> map = this.endOffsets(partition.getTopic());
            for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
                if (entry.getKey().getVGroupId() == partition.getVGroupId()) {
                    this.seek(entry.getKey(), entry.getValue());
                    continue;
                }
                endOffsets.put(entry.getKey(), entry.getValue());
            }
        }
    }

    @Override
    public Set<TopicPartition> assignment() throws SQLException {
        HashSet<TopicPartition> set = new HashSet<TopicPartition>();
        for (String topic : this.subscription()) {
            Assignment[] topicAssignment = this.getAssignment(topic);
            set.addAll(Arrays.stream(topicAssignment).map(a -> new TopicPartition(topic, a.getVgId())).collect(Collectors.toSet()));
        }
        return set;
    }

    @Override
    public OffsetAndMetadata committed(TopicPartition partition) throws SQLException {
        Request request = this.factory.generateCommitted(new TopicPartition[]{partition});
        CommittedResp resp = (CommittedResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer committed error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
        return new OffsetAndMetadata(resp.getCommitted()[0], null);
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition[] topicPartitions = partitions.toArray(new TopicPartition[0]);
        Request request = this.factory.generateCommitted(topicPartitions);
        CommittedResp resp = (CommittedResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer committed error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
        for (int i = 0; i < topicPartitions.length; ++i) {
            map.put(topicPartitions[i], new OffsetAndMetadata(resp.getCommitted()[i], null));
        }
        return map;
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            if (entry.getValue().offset() < 0L) continue;
            Request request = this.factory.generateCommitOffset(entry.getKey(), entry.getValue().offset());
            CommitOffsetResp resp = (CommitOffsetResp)this.transport.send(request);
            if (Code.SUCCESS.getCode() == resp.getCode()) continue;
            throw new SQLException("consumer commit offset error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback<V> callback) {
        callback.onComplete(offsets, TSDBError.createSQLException(8962));
    }

    private Assignment[] getAssignment(String topic) throws SQLException {
        Request request = this.factory.generateAssignment(topic);
        AssignmentResp resp = (AssignmentResp)this.transport.send(request);
        if (Code.SUCCESS.getCode() != resp.getCode()) {
            throw new SQLException("consumer assignment error, code: (0x" + Integer.toHexString(resp.getCode()) + "), message: " + resp.getMessage() + ", timing: " + resp.getTiming());
        }
        return resp.getAssignment();
    }
}

