/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import java.time.Duration;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.connectors.cassandra.CassandraCommitter;
import org.apache.flink.streaming.connectors.cassandra.CassandraFailureHandler;
import org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraRowSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraRowWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraScalaProductSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraSinkBaseConfig;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleSink;
import org.apache.flink.streaming.connectors.cassandra.CassandraTupleWriteAheadSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.streaming.connectors.cassandra.MapperOptions;
import org.apache.flink.streaming.connectors.cassandra.NoOpCassandraFailureHandler;
import org.apache.flink.streaming.runtime.operators.CheckpointCommitter;
import org.apache.flink.types.Row;
import scala.Product;

public class CassandraSink<IN> {
    private final boolean useDataStreamSink;
    private DataStreamSink<IN> sink1;
    private SingleOutputStreamOperator<IN> sink2;

    private CassandraSink(DataStreamSink<IN> sink) {
        this.sink1 = sink;
        this.useDataStreamSink = true;
    }

    private CassandraSink(SingleOutputStreamOperator<IN> sink) {
        this.sink2 = sink;
        this.useDataStreamSink = false;
    }

    private SinkTransformation<IN> getSinkTransformation() {
        return this.sink1.getTransformation();
    }

    private Transformation<IN> getTransformation() {
        return this.sink2.getTransformation();
    }

    public CassandraSink<IN> name(String name) {
        if (this.useDataStreamSink) {
            this.getSinkTransformation().setName(name);
        } else {
            this.getTransformation().setName(name);
        }
        return this;
    }

    @PublicEvolving
    public CassandraSink<IN> uid(String uid) {
        if (this.useDataStreamSink) {
            this.getSinkTransformation().setUid(uid);
        } else {
            this.getTransformation().setUid(uid);
        }
        return this;
    }

    @PublicEvolving
    public CassandraSink<IN> setUidHash(String uidHash) {
        if (this.useDataStreamSink) {
            this.getSinkTransformation().setUidHash(uidHash);
        } else {
            this.getTransformation().setUidHash(uidHash);
        }
        return this;
    }

    public CassandraSink<IN> setParallelism(int parallelism) {
        if (this.useDataStreamSink) {
            this.sink1.setParallelism(parallelism);
        } else {
            this.sink2.setParallelism(parallelism);
        }
        return this;
    }

    public CassandraSink<IN> disableChaining() {
        if (this.useDataStreamSink) {
            this.sink1.disableChaining();
        } else {
            this.sink2.disableChaining();
        }
        return this;
    }

    public CassandraSink<IN> slotSharingGroup(String slotSharingGroup) {
        if (this.useDataStreamSink) {
            this.getSinkTransformation().setSlotSharingGroup(slotSharingGroup);
        } else {
            this.getTransformation().setSlotSharingGroup(slotSharingGroup);
        }
        return this;
    }

    public static <IN> CassandraSinkBuilder<IN> addSink(org.apache.flink.streaming.api.scala.DataStream<IN> input) {
        return CassandraSink.addSink(input.javaStream());
    }

    public static <IN> CassandraSinkBuilder<IN> addSink(DataStream<IN> input) {
        TypeInformation typeInfo = input.getType();
        if (typeInfo instanceof TupleTypeInfo) {
            DataStream<IN> tupleInput = input;
            return new CassandraTupleSinkBuilder<IN>(tupleInput, tupleInput.getType(), tupleInput.getType().createSerializer(tupleInput.getExecutionEnvironment().getConfig()));
        }
        if (typeInfo instanceof RowTypeInfo) {
            DataStream<IN> rowInput = input;
            return new CassandraRowSinkBuilder((DataStream<Row>)rowInput, (TypeInformation<Row>)rowInput.getType(), (TypeSerializer<Row>)rowInput.getType().createSerializer(rowInput.getExecutionEnvironment().getConfig()));
        }
        if (typeInfo instanceof PojoTypeInfo) {
            return new CassandraPojoSinkBuilder<IN>(input, input.getType(), input.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
        }
        if (typeInfo instanceof CaseClassTypeInfo) {
            DataStream<IN> productInput = input;
            return new CassandraScalaProductSinkBuilder<IN>(productInput, productInput.getType(), productInput.getType().createSerializer(input.getExecutionEnvironment().getConfig()));
        }
        throw new IllegalArgumentException("No support for the type of the given DataStream: " + input.getType());
    }

    public static class CassandraScalaProductSinkBuilder<IN extends Product>
    extends CassandraSinkBuilder<IN> {
        public CassandraScalaProductSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
            super(input, typeInfo, serializer);
        }

        @Override
        protected void sanityCheck() {
            super.sanityCheck();
            if (this.query == null || this.query.length() == 0) {
                throw new IllegalArgumentException("Query must not be null or empty.");
            }
            if (this.keyspace != null) {
                throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
            }
        }

        @Override
        public CassandraSink<IN> createSink() throws Exception {
            CassandraScalaProductSink sink = new CassandraScalaProductSink(this.query, this.builder, this.configBuilder.build(), this.failureHandler);
            return new CassandraSink(this.input.addSink(sink).name("Cassandra Sink"));
        }

        @Override
        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
            throw new IllegalArgumentException("Exactly-once guarantees can only be provided for flink tuple types.");
        }
    }

    public static class CassandraPojoSinkBuilder<IN>
    extends CassandraSinkBuilder<IN> {
        public CassandraPojoSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
            super(input, typeInfo, serializer);
        }

        @Override
        protected void sanityCheck() {
            super.sanityCheck();
            if (this.query != null) {
                throw new IllegalArgumentException("Specifying a query is not allowed when using a Pojo-Stream as input.");
            }
        }

        @Override
        public CassandraSink<IN> createSink() throws Exception {
            CassandraPojoSink sink = new CassandraPojoSink(this.typeInfo.getTypeClass(), this.builder, this.mapperOptions, this.keyspace, this.configBuilder.build(), this.failureHandler);
            return new CassandraSink(this.input.addSink(sink).name("Cassandra Sink"));
        }

        @Override
        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
            throw new IllegalArgumentException("Exactly-once guarantees can only be provided for tuple types.");
        }
    }

    public static class CassandraRowSinkBuilder
    extends CassandraSinkBuilder<Row> {
        public CassandraRowSinkBuilder(DataStream<Row> input, TypeInformation<Row> typeInfo, TypeSerializer<Row> serializer) {
            super(input, typeInfo, serializer);
        }

        @Override
        protected void sanityCheck() {
            super.sanityCheck();
            if (this.query == null || this.query.length() == 0) {
                throw new IllegalArgumentException("Query must not be null or empty.");
            }
            if (this.keyspace != null) {
                throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
            }
        }

        @Override
        protected CassandraSink<Row> createSink() throws Exception {
            CassandraRowSink sink = new CassandraRowSink(this.typeInfo.getArity(), this.query, this.builder, this.configBuilder.build(), this.failureHandler);
            return new CassandraSink<Row>(this.input.addSink((SinkFunction)sink).name("Cassandra Sink"));
        }

        @Override
        protected CassandraSink<Row> createWriteAheadSink() throws Exception {
            return this.committer == null ? new CassandraSink(this.input.transform("Cassandra Sink", null, (OneInputStreamOperator)new CassandraRowWriteAheadSink(this.query, (TypeSerializer<Row>)this.serializer, this.builder, new CassandraCommitter(this.builder)))) : new CassandraSink(this.input.transform("Cassandra Sink", null, (OneInputStreamOperator)new CassandraRowWriteAheadSink(this.query, (TypeSerializer<Row>)this.serializer, this.builder, this.committer)));
        }
    }

    public static class CassandraTupleSinkBuilder<IN extends Tuple>
    extends CassandraSinkBuilder<IN> {
        public CassandraTupleSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
            super(input, typeInfo, serializer);
        }

        @Override
        protected void sanityCheck() {
            super.sanityCheck();
            if (this.query == null || this.query.length() == 0) {
                throw new IllegalArgumentException("Query must not be null or empty.");
            }
            if (this.keyspace != null) {
                throw new IllegalArgumentException("Specifying a default keyspace is only allowed when using a Pojo-Stream as input.");
            }
        }

        @Override
        public CassandraSink<IN> createSink() throws Exception {
            CassandraTupleSink sink = new CassandraTupleSink(this.query, this.builder, this.configBuilder.build(), this.failureHandler);
            return new CassandraSink(this.input.addSink(sink).name("Cassandra Sink"));
        }

        @Override
        protected CassandraSink<IN> createWriteAheadSink() throws Exception {
            return this.committer == null ? new CassandraSink(this.input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink(this.query, this.serializer, this.builder, new CassandraCommitter(this.builder)))) : new CassandraSink(this.input.transform("Cassandra Sink", null, new CassandraTupleWriteAheadSink(this.query, this.serializer, this.builder, this.committer)));
        }
    }

    public static abstract class CassandraSinkBuilder<IN> {
        protected final DataStream<IN> input;
        protected final TypeSerializer<IN> serializer;
        protected final TypeInformation<IN> typeInfo;
        protected final CassandraSinkBaseConfig.Builder configBuilder;
        protected ClusterBuilder builder;
        protected String keyspace;
        protected MapperOptions mapperOptions;
        protected String query;
        protected CheckpointCommitter committer;
        protected boolean isWriteAheadLogEnabled;
        protected CassandraFailureHandler failureHandler;

        public CassandraSinkBuilder(DataStream<IN> input, TypeInformation<IN> typeInfo, TypeSerializer<IN> serializer) {
            this.input = input;
            this.typeInfo = typeInfo;
            this.serializer = serializer;
            this.configBuilder = CassandraSinkBaseConfig.newBuilder();
        }

        public CassandraSinkBuilder<IN> setQuery(String query) {
            this.query = query;
            return this;
        }

        public CassandraSinkBuilder<IN> setDefaultKeyspace(String keyspace) {
            this.keyspace = keyspace;
            return this;
        }

        public CassandraSinkBuilder<IN> setHost(String host) {
            return this.setHost(host, 9042);
        }

        public CassandraSinkBuilder<IN> setHost(final String host, final int port) {
            if (this.builder != null) {
                throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
            }
            this.builder = new ClusterBuilder(){

                @Override
                protected Cluster buildCluster(Cluster.Builder builder) {
                    return builder.addContactPoint(host).withPort(port).build();
                }
            };
            return this;
        }

        public CassandraSinkBuilder<IN> setClusterBuilder(ClusterBuilder builder) {
            if (this.builder != null) {
                throw new IllegalArgumentException("Builder was already set. You must use either setHost() or setClusterBuilder().");
            }
            this.builder = builder;
            return this;
        }

        public CassandraSinkBuilder<IN> enableWriteAheadLog() {
            this.isWriteAheadLogEnabled = true;
            return this;
        }

        public CassandraSinkBuilder<IN> enableWriteAheadLog(CheckpointCommitter committer) {
            this.isWriteAheadLogEnabled = true;
            this.committer = committer;
            return this;
        }

        public CassandraSinkBuilder<IN> setMapperOptions(MapperOptions options) {
            this.mapperOptions = options;
            return this;
        }

        public CassandraSinkBuilder<IN> setFailureHandler(CassandraFailureHandler failureHandler) {
            this.failureHandler = failureHandler;
            return this;
        }

        public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests, Duration timeout) {
            this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
            this.configBuilder.setMaxConcurrentRequestsTimeout(timeout);
            return this;
        }

        public CassandraSinkBuilder<IN> setMaxConcurrentRequests(int maxConcurrentRequests) {
            this.configBuilder.setMaxConcurrentRequests(maxConcurrentRequests);
            return this;
        }

        public CassandraSinkBuilder<IN> enableIgnoreNullFields() {
            this.configBuilder.setIgnoreNullFields(true);
            return this;
        }

        public CassandraSink<IN> build() throws Exception {
            this.sanityCheck();
            if (this.failureHandler == null) {
                this.failureHandler = new NoOpCassandraFailureHandler();
            }
            return this.isWriteAheadLogEnabled ? this.createWriteAheadSink() : this.createSink();
        }

        protected abstract CassandraSink<IN> createSink() throws Exception;

        protected abstract CassandraSink<IN> createWriteAheadSink() throws Exception;

        protected void sanityCheck() {
            if (this.builder == null) {
                throw new IllegalArgumentException("Cassandra host information must be supplied using either setHost() or setClusterBuilder().");
            }
        }
    }
}

