/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.io.Closeable;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
import org.apache.flink.runtime.state.AsynchronousStateHandle;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
extends AbstractInvokable
implements StatefulTask<StreamTaskStateList> {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final Object lock = new Object();
    protected Operator headOperator;
    private OperatorChain<OUT> operatorChain;
    private StreamConfig configuration;
    private ClassLoader userClassLoader;
    private TimeServiceProvider timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private StreamTaskStateList lazyRestoreState;
    private volatile AsynchronousException asyncException;
    private final Set<Closeable> cancelables = new HashSet<Closeable>();
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private long lastCheckpointSize = 0L;

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public void setTimeService(TimeServiceProvider timeProvider) {
        if (timeProvider == null) {
            throw new RuntimeException("The timeProvider cannot be set to null.");
        }
        this.timerService = timeProvider;
    }

    public long getCurrentProcessingTime() {
        return this.timerService.getCurrentProcessingTime();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void invoke() throws Exception {
        boolean disposed = false;
        try {
            Object executor;
            LOG.debug("Initializing {}", (Object)this.getName());
            this.userClassLoader = this.getUserCodeClassLoader();
            this.configuration = new StreamConfig(this.getTaskConfiguration());
            this.accumulatorMap = this.getEnvironment().getAccumulatorRegistry().getUserMap();
            if (this.timerService == null) {
                executor = new ScheduledThreadPoolExecutor(1, (ThreadFactory)new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName()));
                ((ScheduledThreadPoolExecutor)executor).setRemoveOnCancelPolicy(true);
                this.timerService = DefaultTimeServiceProvider.create((ScheduledExecutorService)executor);
            }
            this.headOperator = (StreamOperator)this.configuration.getStreamOperator(this.userClassLoader);
            this.operatorChain = new OperatorChain(this, this.headOperator, this.getEnvironment().getAccumulatorRegistry().getReadWriteReporter());
            if (this.headOperator != null) {
                this.headOperator.setup(this, this.configuration, this.operatorChain.getChainEntryPoint());
            }
            this.getEnvironment().getMetricGroup().gauge("lastCheckpointSize", (Gauge)new Gauge<Long>(){

                public Long getValue() {
                    return StreamTask.this.lastCheckpointSize;
                }
            });
            this.init();
            if (this.canceled) {
                throw new CancelTaskException();
            }
            LOG.debug("Invoking {}", (Object)this.getName());
            this.restoreState(this.lazyRestoreState);
            this.lazyRestoreState = null;
            executor = this.lock;
            synchronized (executor) {
                this.openAllOperators();
            }
            if (this.canceled) {
                throw new CancelTaskException();
            }
            this.isRunning = true;
            this.run();
            LOG.debug("Finished task {}", (Object)this.getName());
            executor = this.lock;
            synchronized (executor) {
                this.isRunning = false;
                this.closeAllOperators();
            }
            LOG.debug("Closed operators for task {}", (Object)this.getName());
            this.operatorChain.flushOutputs();
            this.tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            this.isRunning = false;
            if (this.timerService != null) {
                try {
                    this.timerService.shutdownService();
                }
                catch (Throwable t) {
                    LOG.error("Could not shut down timer service", t);
                }
            }
            try {
                this.closeAllClosables();
            }
            catch (Throwable t) {
                LOG.error("Could not shut down async checkpoint threads", t);
            }
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            try {
                this.cleanup();
            }
            catch (Throwable t) {
                LOG.error("Error during cleanup of stream task", t);
            }
            if (!disposed) {
                this.disposeAllOperators();
            }
        }
    }

    public void failExternally(Throwable cause) {
        this.getEnvironment().failExternally(cause);
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        this.cancelTask();
        this.closeAllClosables();
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = allOperators.length - 1; i >= 0; --i) {
            StreamOperator<?> operator = allOperators[i];
            if (operator == null) continue;
            operator.close();
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.dispose();
        }
    }

    private void disposeAllOperators() {
        if (this.operatorChain != null) {
            for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                try {
                    if (operator == null) continue;
                    operator.dispose();
                }
                catch (Throwable t) {
                    LOG.error("Error during disposal of stream operator.", t);
                }
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null) {
            this.timerService.shutdownService();
        }
        this.closeAllClosables();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeAllClosables() {
        ArrayList<Closeable> localCancelables = null;
        Set<Closeable> set = this.cancelables;
        synchronized (set) {
            if (this.cancelables.size() > 0) {
                localCancelables = new ArrayList<Closeable>(this.cancelables);
                this.cancelables.clear();
            }
        }
        if (localCancelables != null) {
            for (Closeable cancelable : localCancelables) {
                try {
                    cancelable.close();
                }
                catch (Throwable t) {
                    LOG.error("Error on canceling operation", t);
                }
            }
        }
    }

    protected boolean isSerializingTimestamps() {
        TimeCharacteristic tc = this.configuration.getTimeCharacteristic();
        return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
    }

    public String getName() {
        return this.getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(StreamTaskStateList initialState) {
        this.lazyRestoreState = initialState;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreState(StreamTaskStateList restoredState) throws Exception {
        if (restoredState != null) {
            LOG.info("Restoring checkpointed state to task {}", (Object)this.getName());
            Set<Closeable> set = this.cancelables;
            synchronized (set) {
                this.cancelables.add((Closeable)((Object)restoredState));
            }
            try {
                Object allOperators = this.operatorChain.getAllOperators();
                StreamTaskState[] states = restoredState.getState(this.userClassLoader);
                for (int i = 0; i < states.length; ++i) {
                    StreamTaskState state = states[i];
                    StreamOperator<?> operator = allOperators[i];
                    if (state != null && operator != null) {
                        LOG.debug("Task {} in chain ({}) has checkpointed state", (Object)i, (Object)this.getName());
                        operator.restoreState(state);
                        continue;
                    }
                    if (operator == null) continue;
                    LOG.debug("Task {} in chain ({}) does not have checkpointed state", (Object)i, (Object)this.getName());
                }
            }
            catch (Exception e) {
                throw new Exception("Could not restore checkpointed state to operators and functions", e);
            }
            finally {
                Set<Closeable> set2 = this.cancelables;
                synchronized (set2) {
                    this.cancelables.remove(restoredState);
                }
            }
        }
    }

    public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
        try {
            return this.performCheckpoint(checkpointId, timestamp);
        }
        catch (Exception e) {
            if (this.isRunning) {
                throw e;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean performCheckpoint(long checkpointId, long timestamp) throws Exception {
        LOG.debug("Starting checkpoint {} on task {}", (Object)checkpointId, (Object)this.getName());
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                this.operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
                StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
                StreamTaskState[] states = new StreamTaskState[allOperators.length];
                boolean hasAsyncStates = false;
                for (int i = 0; i < states.length; ++i) {
                    StreamOperator<?> operator = allOperators[i];
                    if (operator == null) continue;
                    StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
                    if (state.getOperatorState() instanceof AsynchronousStateHandle) {
                        hasAsyncStates = true;
                    }
                    if (state.getFunctionState() instanceof AsynchronousStateHandle) {
                        hasAsyncStates = true;
                    }
                    if (state.getKvStates() != null) {
                        for (KvStateSnapshot<?, ?, ?, ?, ?> kvSnapshot : state.getKvStates().values()) {
                            if (!(kvSnapshot instanceof AsynchronousKvStateSnapshot)) continue;
                            hasAsyncStates = true;
                        }
                    }
                    states[i] = state.isEmpty() ? null : state;
                }
                if (!this.isRunning) {
                    throw new CancelTaskException();
                }
                StreamTaskStateList allStates = new StreamTaskStateList(states);
                if (allStates.isEmpty()) {
                    this.getEnvironment().acknowledgeCheckpoint(checkpointId);
                } else if (!hasAsyncStates) {
                    this.lastCheckpointSize = allStates.getStateSize();
                    this.getEnvironment().acknowledgeCheckpoint(checkpointId, (StateHandle)allStates);
                } else {
                    String threadName = "Materialize checkpoint state " + checkpointId + " - " + this.getName();
                    AsyncCheckpointThread checkpointThread = new AsyncCheckpointThread(threadName, this, this.cancelables, states, checkpointId);
                    Set<Closeable> set = this.cancelables;
                    synchronized (set) {
                        this.cancelables.add(checkpointThread);
                    }
                    checkpointThread.start();
                }
                return true;
            }
            return false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", (Object)this.getName());
                for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                    if (operator == null) continue;
                    operator.notifyOfCompletedCheckpoint(checkpointId);
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", (Object)this.getName());
            }
        }
    }

    public AbstractStateBackend createStateBackend(String operatorIdentifier, TypeSerializer<?> keySerializer) throws Exception {
        AbstractStateBackend stateBackend = this.configuration.getStateBackend(this.userClassLoader);
        if (stateBackend != null) {
            LOG.info("Using user-defined state backend: " + stateBackend);
        } else {
            Configuration flinkConfig = this.getEnvironment().getTaskManagerInfo().getConfiguration();
            String backendName = flinkConfig.getString("state.backend", null);
            if (backendName == null) {
                LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
                backendName = "jobmanager";
            }
            switch (backendName.toLowerCase()) {
                case "jobmanager": {
                    LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
                    stateBackend = MemoryStateBackend.create();
                    break;
                }
                case "filesystem": {
                    FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
                    LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" + backend.getBasePath() + "\")");
                    stateBackend = backend;
                    break;
                }
                default: {
                    try {
                        Class<StateBackendFactory> clazz = Class.forName(backendName, false, this.userClassLoader).asSubclass(StateBackendFactory.class);
                        stateBackend = clazz.newInstance().createFromConfig(flinkConfig);
                        break;
                    }
                    catch (ClassNotFoundException e) {
                        throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
                    }
                    catch (ClassCastException e) {
                        throw new IllegalConfigurationException("The class configured under 'state.backend' is not a valid state backend factory (" + backendName + ')');
                    }
                    catch (Throwable t) {
                        throw new IllegalConfigurationException("Cannot create configured state backend", t);
                    }
                }
            }
        }
        stateBackend.initializeForJob(this.getEnvironment(), operatorIdentifier, keySerializer);
        return stateBackend;
    }

    public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
        if (this.timerService == null) {
            throw new IllegalStateException("The timer service has not been initialized.");
        }
        return this.timerService.registerTimer(timestamp, new TriggerTask(this, this.lock, target, timestamp));
    }

    public void checkTimerException() throws AsynchronousException {
        if (this.asyncException != null) {
            throw this.asyncException;
        }
    }

    public String toString() {
        return this.getName();
    }

    protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
        return new EventListener<CheckpointBarrier>(){

            public void onEvent(CheckpointBarrier barrier) {
                try {
                    StreamTask.this.performCheckpoint(barrier.getId(), barrier.getTimestamp());
                }
                catch (CancelTaskException e) {
                    throw e;
                }
                catch (Exception e) {
                    throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
                }
            }
        };
    }

    private static class AsyncCheckpointThread
    extends Thread
    implements Closeable {
        private final StreamTask<?, ?> owner;
        private final Set<Closeable> cancelables;
        private final StreamTaskState[] states;
        private final long checkpointId;

        AsyncCheckpointThread(String name, StreamTask<?, ?> owner, Set<Closeable> cancelables, StreamTaskState[] states, long checkpointId) {
            super(name);
            this.setDaemon(true);
            this.owner = owner;
            this.cancelables = cancelables;
            this.states = states;
            this.checkpointId = checkpointId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                for (StreamTaskState state : this.states) {
                    AsynchronousStateHandle asyncState;
                    if (state == null) continue;
                    if (state.getFunctionState() instanceof AsynchronousStateHandle) {
                        asyncState = (AsynchronousStateHandle)state.getFunctionState();
                        state.setFunctionState((StateHandle<Serializable>)asyncState.materialize());
                    }
                    if (state.getOperatorState() instanceof AsynchronousStateHandle) {
                        asyncState = (AsynchronousStateHandle)state.getOperatorState();
                        state.setOperatorState(asyncState.materialize());
                    }
                    if (state.getKvStates() == null) continue;
                    Set<String> keys = state.getKvStates().keySet();
                    HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> kvStates = state.getKvStates();
                    for (String key : keys) {
                        if (!(kvStates.get(key) instanceof AsynchronousKvStateSnapshot)) continue;
                        AsynchronousKvStateSnapshot asyncHandle = (AsynchronousKvStateSnapshot)kvStates.get(key);
                        kvStates.put(key, asyncHandle.materialize());
                    }
                }
                StreamTaskStateList allStates = new StreamTaskStateList(this.states);
                ((StreamTask)this.owner).lastCheckpointSize = allStates.getStateSize();
                this.owner.getEnvironment().acknowledgeCheckpoint(this.checkpointId, (StateHandle)allStates);
                LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}", (Object)this.checkpointId, (Object)this.getName());
            }
            catch (Exception e) {
                if (this.owner.isRunning()) {
                    LOG.error("Caught exception while materializing asynchronous checkpoints.", (Throwable)e);
                }
                if (((StreamTask)this.owner).asyncException == null) {
                    ((StreamTask)this.owner).asyncException = new AsynchronousException(e);
                }
            }
            finally {
                Set<Closeable> set = this.cancelables;
                synchronized (set) {
                    this.cancelables.remove(this);
                }
            }
        }

        @Override
        public void close() {
            this.interrupt();
        }
    }

    private static final class TriggerTask
    implements Runnable {
        private final Object lock;
        private final Triggerable target;
        private final long timestamp;
        private final StreamTask<?, ?> task;

        TriggerTask(StreamTask<?, ?> task, Object lock, Triggerable target, long timestamp) {
            this.task = task;
            this.lock = lock;
            this.target = target;
            this.timestamp = timestamp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                block6: {
                    try {
                        this.target.trigger(this.timestamp);
                    }
                    catch (Throwable t) {
                        if (((StreamTask)this.task).isRunning) {
                            LOG.error("Caught exception while processing timer.", t);
                        }
                        if (((StreamTask)this.task).asyncException != null) break block6;
                        ((StreamTask)this.task).asyncException = new TimerException(t);
                    }
                }
            }
        }
    }
}

