/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.batch.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.ResultSetFuture;
import com.datastax.driver.core.Session;
import java.io.IOException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.cassandra.shaded.com.google.common.base.Strings;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CassandraOutputFormatBase<OUT>
extends RichOutputFormat<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormatBase.class);
    private final String insertQuery;
    private final ClusterBuilder builder;
    private transient Cluster cluster;
    private transient Session session;
    private transient PreparedStatement prepared;
    private transient FutureCallback<ResultSet> callback;
    private transient Throwable exception = null;

    public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder) {
        Preconditions.checkArgument((!Strings.isNullOrEmpty(insertQuery) ? 1 : 0) != 0, (Object)"Query cannot be null or empty");
        Preconditions.checkNotNull((Object)builder, (String)"Builder cannot be null");
        this.insertQuery = insertQuery;
        this.builder = builder;
    }

    public void configure(Configuration parameters) {
        this.cluster = this.builder.getCluster();
    }

    public void open(int taskNumber, int numTasks) throws IOException {
        this.session = this.cluster.connect();
        this.prepared = this.session.prepare(this.insertQuery);
        this.callback = new FutureCallback<ResultSet>(){

            @Override
            public void onSuccess(ResultSet ignored) {
                CassandraOutputFormatBase.this.onWriteSuccess(ignored);
            }

            @Override
            public void onFailure(Throwable t) {
                CassandraOutputFormatBase.this.onWriteFailure(t);
            }
        };
    }

    public void writeRecord(OUT record) throws IOException {
        if (this.exception != null) {
            throw new IOException("write record failed", this.exception);
        }
        Object[] fields = this.extractFields(record);
        ResultSetFuture result = this.session.executeAsync(this.prepared.bind(fields));
        Futures.addCallback(result, this.callback);
    }

    protected abstract Object[] extractFields(OUT var1);

    protected void onWriteSuccess(ResultSet ignored) {
    }

    protected void onWriteFailure(Throwable t) {
        this.exception = t;
    }

    public void close() throws IOException {
        try {
            if (this.session != null) {
                this.session.close();
            }
        }
        catch (Exception e) {
            LOG.error("Error while closing session.", (Throwable)e);
        }
        try {
            if (this.cluster != null) {
                this.cluster.close();
            }
        }
        catch (Exception e) {
            LOG.error("Error while closing cluster.", (Throwable)e);
        }
    }
}

