/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.BitSet;
import java.util.Collection;
import java.util.Optional;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputGateUtil;
import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public final class StreamTwoInputProcessor<IN1, IN2>
implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
    private final DeserializationDelegate<StreamElement> deserializationDelegate1;
    private final DeserializationDelegate<StreamElement> deserializationDelegate2;
    private final CheckpointedInputGate barrierHandler;
    private final Object lock;
    private final OperatorChain<?, ?> operatorChain;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private StatusWatermarkValve statusWatermarkValve1;
    private StatusWatermarkValve statusWatermarkValve2;
    private final int numInputChannels1;
    private final int numInputChannels2;
    private int currentChannel = -1;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private final WatermarkGauge input1WatermarkGauge;
    private final WatermarkGauge input2WatermarkGauge;
    private Counter numRecordsIn;
    private final BitSet finishedChannels1;
    private final BitSet finishedChannels2;
    private boolean isFinished;

    public StreamTwoInputProcessor(Collection<InputGate> inputGates1, Collection<InputGate> inputGates2, TypeSerializer<IN1> inputSerializer1, TypeSerializer<IN2> inputSerializer2, TwoInputStreamTask<IN1, IN2, ?> checkpointedTask, CheckpointingMode checkpointMode, Object lock, IOManager ioManager, Configuration taskManagerConfig, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> streamOperator, TaskIOMetricGroup metrics, WatermarkGauge input1WatermarkGauge, WatermarkGauge input2WatermarkGauge, String taskName, OperatorChain<?, ?> operatorChain) throws IOException {
        InputGate inputGate = InputGateUtil.createInputGate(inputGates1, inputGates2);
        this.barrierHandler = InputProcessorUtil.createCheckpointedInputGate(checkpointedTask, checkpointMode, ioManager, inputGate, taskManagerConfig, taskName);
        this.lock = Preconditions.checkNotNull((Object)lock);
        StreamElementSerializer<IN1> ser1 = new StreamElementSerializer<IN1>(inputSerializer1);
        this.deserializationDelegate1 = new NonReusingDeserializationDelegate(ser1);
        StreamElementSerializer<IN2> ser2 = new StreamElementSerializer<IN2>(inputSerializer2);
        this.deserializationDelegate2 = new NonReusingDeserializationDelegate(ser2);
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; ++i) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(ioManager.getSpillingDirectoriesPaths());
        }
        int numInputChannels1 = 0;
        for (InputGate gate : inputGates1) {
            numInputChannels1 += gate.getNumberOfInputChannels();
        }
        this.numInputChannels1 = numInputChannels1;
        this.numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.streamStatusMaintainer = (StreamStatusMaintainer)Preconditions.checkNotNull((Object)streamStatusMaintainer);
        this.streamOperator = (TwoInputStreamOperator)Preconditions.checkNotNull(streamOperator);
        this.statusWatermarkValve1 = new StatusWatermarkValve(numInputChannels1, new ForwardingValveOutputHandler1(streamOperator, lock));
        this.statusWatermarkValve2 = new StatusWatermarkValve(this.numInputChannels2, new ForwardingValveOutputHandler2(streamOperator, lock));
        this.input1WatermarkGauge = input1WatermarkGauge;
        this.input2WatermarkGauge = input2WatermarkGauge;
        metrics.gauge("checkpointAlignmentTime", this.barrierHandler::getAlignmentDurationNanos);
        this.operatorChain = (OperatorChain)Preconditions.checkNotNull(operatorChain);
        this.finishedChannels1 = new BitSet();
        this.finishedChannels2 = new BitSet();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean processInput() throws Exception {
        if (this.isFinished) {
            return false;
        }
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = ((OperatorMetricGroup)this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            }
            catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
        while (true) {
            Optional<BufferOrEvent> bufferOrEvent;
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result = this.currentChannel < this.numInputChannels1 ? this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate1) : this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate2);
                if (result.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (result.isFullRecord()) {
                    Object record;
                    StreamElement recordOrWatermark;
                    if (this.currentChannel < this.numInputChannels1) {
                        recordOrWatermark = (StreamElement)this.deserializationDelegate1.getInstance();
                        if (recordOrWatermark.isWatermark()) {
                            this.statusWatermarkValve1.inputWatermark(recordOrWatermark.asWatermark(), this.currentChannel);
                            continue;
                        }
                        if (recordOrWatermark.isStreamStatus()) {
                            this.statusWatermarkValve1.inputStreamStatus(recordOrWatermark.asStreamStatus(), this.currentChannel);
                            continue;
                        }
                        if (recordOrWatermark.isLatencyMarker()) {
                            Object object = this.lock;
                            synchronized (object) {
                                this.streamOperator.processLatencyMarker1(recordOrWatermark.asLatencyMarker());
                            }
                        }
                        record = recordOrWatermark.asRecord();
                        Object object = this.lock;
                        synchronized (object) {
                            this.numRecordsIn.inc();
                            this.streamOperator.setKeyContextElement1((StreamRecord<?>)record);
                            this.streamOperator.processElement1((StreamRecord<IN1>)record);
                        }
                        return true;
                    }
                    recordOrWatermark = (StreamElement)this.deserializationDelegate2.getInstance();
                    if (recordOrWatermark.isWatermark()) {
                        this.statusWatermarkValve2.inputWatermark(recordOrWatermark.asWatermark(), this.currentChannel - this.numInputChannels1);
                        continue;
                    }
                    if (recordOrWatermark.isStreamStatus()) {
                        this.statusWatermarkValve2.inputStreamStatus(recordOrWatermark.asStreamStatus(), this.currentChannel - this.numInputChannels1);
                        continue;
                    }
                    if (recordOrWatermark.isLatencyMarker()) {
                        record = this.lock;
                        synchronized (record) {
                            this.streamOperator.processLatencyMarker2(recordOrWatermark.asLatencyMarker());
                        }
                    }
                    record = recordOrWatermark.asRecord();
                    Object object = this.lock;
                    synchronized (object) {
                        this.numRecordsIn.inc();
                        this.streamOperator.setKeyContextElement2((StreamRecord<?>)record);
                        this.streamOperator.processElement2((StreamRecord<IN2>)record);
                    }
                    return true;
                }
            }
            if ((bufferOrEvent = this.barrierHandler.pollNext()).isPresent()) {
                this.processBufferOrEvent(bufferOrEvent.get());
                continue;
            }
            if (this.barrierHandler.isFinished()) break;
            this.barrierHandler.isAvailable().get();
        }
        this.isFinished = true;
        if (!this.barrierHandler.isEmpty()) {
            throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
        }
        return false;
    }

    private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws Exception {
        if (bufferOrEvent.isBuffer()) {
            this.currentChannel = bufferOrEvent.getChannelIndex();
            this.currentRecordDeserializer = this.recordDeserializers[this.currentChannel];
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
        } else {
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
            this.handleEndOfPartitionEvent(bufferOrEvent.getChannelIndex());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleEndOfPartitionEvent(int channelIndex) throws Exception {
        int finishedInputId = -1;
        if (channelIndex < this.numInputChannels1) {
            this.finishedChannels1.set(channelIndex);
            if (this.finishedChannels1.cardinality() == this.numInputChannels1) {
                finishedInputId = 1;
            }
        } else {
            this.finishedChannels2.set(channelIndex - this.numInputChannels1);
            if (this.finishedChannels2.cardinality() == this.numInputChannels2) {
                finishedInputId = 2;
            }
        }
        if (finishedInputId > 0) {
            Object object = this.lock;
            synchronized (object) {
                this.operatorChain.endInput(finishedInputId);
            }
        }
    }

    @Override
    public void close() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
        }
        this.barrierHandler.cleanup();
    }

    private class ForwardingValveOutputHandler2
    implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler2(TwoInputStreamOperator<IN1, IN2, ?> operator, Object lock) {
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.input2WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark2(watermark);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.secondStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.firstStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    private class ForwardingValveOutputHandler1
    implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler1(TwoInputStreamOperator<IN1, IN2, ?> operator, Object lock) {
            this.operator = (TwoInputStreamOperator)Preconditions.checkNotNull(operator);
            this.lock = Preconditions.checkNotNull((Object)lock);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleWatermark(Watermark watermark) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.input1WatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark1(watermark);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                Object object = this.lock;
                synchronized (object) {
                    StreamTwoInputProcessor.this.firstStatus = streamStatus;
                    if (!streamStatus.equals(StreamTwoInputProcessor.this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (StreamTwoInputProcessor.this.secondStatus.isIdle()) {
                            StreamTwoInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }
}

