/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.FutureCallback;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.Futures;
import org.apache.flink.cassandra.shaded.com.google.common.util.concurrent.ListenableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBaseConfig;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CassandraSinkBase<IN, V>
extends RichSinkFunction<IN>
implements CheckpointedFunction {
    protected final Logger log = LoggerFactory.getLogger(((Object)((Object)this)).getClass());
    protected transient Cluster cluster;
    protected transient Session session;
    private AtomicReference<Throwable> throwable;
    private FutureCallback<V> callback;
    private Semaphore semaphore;
    private final ClusterBuilder builder;
    private final CassandraSinkBaseConfig config;
    private final CassandraFailureHandler failureHandler;

    CassandraSinkBase(ClusterBuilder builder, CassandraSinkBaseConfig config, CassandraFailureHandler failureHandler) {
        this.builder = builder;
        this.config = config;
        this.failureHandler = (CassandraFailureHandler)Preconditions.checkNotNull((Object)failureHandler);
        ClosureCleaner.clean((Object)builder, (ExecutionConfig.ClosureCleanerLevel)ExecutionConfig.ClosureCleanerLevel.RECURSIVE, (boolean)true);
    }

    public void open(Configuration configuration) {
        this.callback = new FutureCallback<V>(){

            @Override
            public void onSuccess(V ignored) {
                CassandraSinkBase.this.semaphore.release();
            }

            @Override
            public void onFailure(Throwable t) {
                CassandraSinkBase.this.throwable.compareAndSet(null, t);
                CassandraSinkBase.this.log.error("Error while sending value.", t);
                CassandraSinkBase.this.semaphore.release();
            }
        };
        this.cluster = this.builder.getCluster();
        this.session = this.createSession();
        this.throwable = new AtomicReference();
        this.semaphore = new Semaphore(this.config.getMaxConcurrentRequests());
    }

    public void close() throws Exception {
        try {
            this.checkAsyncErrors();
            this.flush();
            this.checkAsyncErrors();
        }
        finally {
            try {
                if (this.session != null) {
                    this.session.close();
                }
            }
            catch (Exception e) {
                this.log.error("Error while closing session.", (Throwable)e);
            }
            try {
                if (this.cluster != null) {
                    this.cluster.close();
                }
            }
            catch (Exception e) {
                this.log.error("Error while closing cluster.", (Throwable)e);
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        this.checkAsyncErrors();
        this.flush();
        this.checkAsyncErrors();
    }

    public void invoke(IN value) throws Exception {
        ListenableFuture<V> result;
        this.checkAsyncErrors();
        this.tryAcquire(1);
        try {
            result = this.send(value);
        }
        catch (Exception e) {
            this.semaphore.release();
            throw e;
        }
        Futures.addCallback(result, this.callback);
    }

    protected Session createSession() {
        return this.cluster.connect();
    }

    public abstract ListenableFuture<V> send(IN var1);

    private void tryAcquire(int permits) throws InterruptedException, TimeoutException {
        if (!this.semaphore.tryAcquire(permits, this.config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
            throw new TimeoutException(String.format("Failed to acquire %d out of %d permits to send value in %s.", permits, this.config.getMaxConcurrentRequests(), this.config.getMaxConcurrentRequestsTimeout()));
        }
    }

    private void checkAsyncErrors() throws Exception {
        Throwable currentError = this.throwable.getAndSet(null);
        if (currentError != null) {
            this.failureHandler.onFailure(currentError);
        }
    }

    private void flush() throws InterruptedException, TimeoutException {
        this.tryAcquire(this.config.getMaxConcurrentRequests());
        this.semaphore.release(this.config.getMaxConcurrentRequests());
    }

    @VisibleForTesting
    int getAvailablePermits() {
        return this.semaphore.availablePermits();
    }

    @VisibleForTesting
    int getAcquiredPermits() {
        return this.config.getMaxConcurrentRequests() - this.semaphore.availablePermits();
    }
}

