/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamTaskNetworkInput
implements StreamTaskInput {
    private final CheckpointedInputGate checkpointedInputGate;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final int inputIndex;
    private int lastChannel = -1;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null;
    private boolean isFinished = false;

    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> inputSerializer, IOManager ioManager, int inputIndex) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(inputSerializer));
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; ++i) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(ioManager.getSpillingDirectoriesPaths());
        }
        this.inputIndex = inputIndex;
    }

    @Nullable
    public StreamElement pollNextNullable() throws Exception {
        while (true) {
            Optional<BufferOrEvent> bufferOrEvent;
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (result.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (result.isFullRecord()) {
                    return (StreamElement)this.deserializationDelegate.getInstance();
                }
            }
            if (!(bufferOrEvent = this.checkpointedInputGate.pollNext()).isPresent()) break;
            this.processBufferOrEvent(bufferOrEvent.get());
        }
        if (this.checkpointedInputGate.isFinished()) {
            this.isFinished = true;
            Preconditions.checkState((boolean)this.checkpointedInputGate.isAvailable().isDone(), (Object)"Finished BarrierHandler should be available");
            if (!this.checkpointedInputGate.isEmpty()) {
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
        }
        return null;
    }

    private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
        if (bufferOrEvent.isBuffer()) {
            this.lastChannel = bufferOrEvent.getChannelIndex();
            this.currentRecordDeserializer = this.recordDeserializers[this.lastChannel];
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
        } else {
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
        }
    }

    @Override
    public int getLastChannel() {
        return this.lastChannel;
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    public boolean isFinished() {
        return this.isFinished;
    }

    public CompletableFuture<?> isAvailable() {
        if (this.currentRecordDeserializer != null) {
            return AVAILABLE;
        }
        return this.checkpointedInputGate.isAvailable();
    }

    @Override
    public void close() throws IOException {
        for (RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer : this.recordDeserializers) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
        }
        this.checkpointedInputGate.cleanup();
    }
}

