/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;

public class CassandraCommitter
extends CheckpointCommitter {
    private static final long serialVersionUID = 1L;
    private final ClusterBuilder builder;
    private transient Cluster cluster;
    private transient Session session;
    private String keySpace = "flink_auxiliary";
    private String table = "checkpoints_";
    private final Map<Integer, Long> lastCommittedCheckpoints = new HashMap<Integer, Long>();

    public CassandraCommitter(ClusterBuilder builder) {
        this.builder = builder;
        ClosureCleaner.clean((Object)builder, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
    }

    public CassandraCommitter(ClusterBuilder builder, String keySpace) {
        this(builder);
        this.keySpace = keySpace;
    }

    public void setJobId(String id) throws Exception {
        super.setJobId(id);
        this.table = this.table + id;
    }

    public void createResource() throws Exception {
        this.cluster = this.builder.getCluster();
        this.session = this.cluster.connect();
        this.session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s with replication={'class':'SimpleStrategy', 'replication_factor':1};", this.keySpace));
        this.session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (sink_id text, sub_id int, checkpoint_id bigint, PRIMARY KEY (sink_id, sub_id));", this.keySpace, this.table));
        try {
            this.session.close();
        }
        catch (Exception e) {
            LOG.error("Error while closing session.", (Throwable)e);
        }
        try {
            this.cluster.close();
        }
        catch (Exception e) {
            LOG.error("Error while closing cluster.", (Throwable)e);
        }
    }

    public void open() throws Exception {
        if (this.builder == null) {
            throw new RuntimeException("No ClusterBuilder was set.");
        }
        this.cluster = this.builder.getCluster();
        this.session = this.cluster.connect();
    }

    public void close() throws Exception {
        this.lastCommittedCheckpoints.clear();
        try {
            this.session.close();
        }
        catch (Exception e) {
            LOG.error("Error while closing session.", (Throwable)e);
        }
        try {
            this.cluster.close();
        }
        catch (Exception e) {
            LOG.error("Error while closing cluster.", (Throwable)e);
        }
    }

    public void commitCheckpoint(int subtaskIdx, long checkpointId) {
        String statement = String.format("UPDATE %s.%s set checkpoint_id=%d where sink_id='%s' and sub_id=%d;", this.keySpace, this.table, checkpointId, this.operatorId, subtaskIdx);
        this.session.execute(statement);
        this.lastCommittedCheckpoints.put(subtaskIdx, checkpointId);
    }

    public boolean isCheckpointCommitted(int subtaskIdx, long checkpointId) {
        String statement;
        Iterator<Row> resultIt;
        Long lastCommittedCheckpoint = this.lastCommittedCheckpoints.get(subtaskIdx);
        if (lastCommittedCheckpoint == null && (resultIt = this.session.execute(statement = String.format("SELECT checkpoint_id FROM %s.%s where sink_id='%s' and sub_id=%d;", this.keySpace, this.table, this.operatorId, subtaskIdx)).iterator()).hasNext()) {
            lastCommittedCheckpoint = resultIt.next().getLong("checkpoint_id");
            this.lastCommittedCheckpoints.put(subtaskIdx, lastCommittedCheckpoint);
        }
        return lastCommittedCheckpoint != null && checkpointId <= lastCommittedCheckpoint;
    }
}

