/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AppendingState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MergingState;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.com.google.common.collect.HashMultiset;
import org.apache.flink.shaded.com.google.common.collect.Multiset;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet;
import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable,
InputTypeConfigurable {
    private static final long serialVersionUID = 1L;
    protected final WindowAssigner<? super IN, W> windowAssigner;
    protected final KeySelector<IN, K> keySelector;
    protected final Trigger<? super IN, ? super W> trigger;
    protected final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected TypeSerializer<IN> inputSerializer;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;
    protected final long allowedLateness;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient Map<Long, ScheduledFuture<?>> processingTimeTimerFutures;
    protected transient long currentWatermark = Long.MIN_VALUE;
    protected transient Context context = new Context(this, null, null);
    protected transient WindowAssigner.WindowAssignerContext windowAssignerContext;
    protected transient PriorityQueue<Timer<K, W>> processingTimeTimersQueue;
    protected transient Set<Timer<K, W>> processingTimeTimers;
    protected transient Multiset<Long> processingTimeTimerTimestamps;
    protected transient Set<Timer<K, W>> watermarkTimers;
    protected transient PriorityQueue<Timer<K, W>> watermarkTimersQueue;
    protected transient Map<K, MergingWindowSet<W>> mergingWindowsByKey;

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor, InternalWindowFunction<ACC, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, long allowedLateness) {
        super(windowFunction);
        this.windowAssigner = Objects.requireNonNull(windowAssigner);
        this.windowSerializer = windowSerializer;
        this.keySelector = Objects.requireNonNull(keySelector);
        this.keySerializer = Objects.requireNonNull(keySerializer);
        this.windowStateDescriptor = windowStateDescriptor;
        this.trigger = Objects.requireNonNull(trigger);
        Preconditions.checkArgument((allowedLateness >= 0L ? 1 : 0) != 0);
        this.allowedLateness = allowedLateness;
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        this.inputSerializer = type.createSerializer(executionConfig);
    }

    @Override
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashSet<Timer<K, W>>();
            this.watermarkTimersQueue = new PriorityQueue(100);
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashSet<Timer<K, W>>();
            this.processingTimeTimerTimestamps = HashMultiset.create();
            this.processingTimeTimersQueue = new PriorityQueue(100);
        }
        this.processingTimeTimerFutures = new HashMap();
        this.context = new Context(this, null, null);
        this.windowAssignerContext = new WindowAssigner.WindowAssignerContext(){

            @Override
            public long getCurrentProcessingTime() {
                return WindowOperator.this.getCurrentProcessingTime();
            }
        };
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            this.mergingWindowsByKey = new HashMap<K, MergingWindowSet<W>>();
        }
        this.currentWatermark = Long.MIN_VALUE;
    }

    @Override
    public final void close() throws Exception {
        super.close();
        this.timestampedCollector = null;
        this.watermarkTimers = null;
        this.watermarkTimersQueue = null;
        this.processingTimeTimers = null;
        this.processingTimeTimersQueue = null;
        this.context = null;
        this.windowAssignerContext = null;
        this.mergingWindowsByKey = null;
    }

    @Override
    public void dispose() {
        super.dispose();
        this.timestampedCollector = null;
        this.watermarkTimers = null;
        this.watermarkTimersQueue = null;
        this.processingTimeTimers = null;
        this.processingTimeTimersQueue = null;
        this.context = null;
        this.windowAssignerContext = null;
        this.mergingWindowsByKey = null;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp(), this.windowAssignerContext);
        final Object key = this.getStateBackend().getCurrentKey();
        if (this.windowAssigner instanceof MergingWindowAssigner) {
            MergingWindowSet<Window> mergingWindows = this.getMergingWindowSet();
            for (Window window : elementWindows) {
                Tuple1 mergeTriggerResult;
                Window actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>(mergeTriggerResult = new Tuple1((Object)TriggerResult.CONTINUE)){
                    final /* synthetic */ Tuple1 val$mergeTriggerResult;
                    {
                        this.val$mergeTriggerResult = tuple1;
                    }

                    @Override
                    public void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception {
                        WindowOperator.this.context.key = key;
                        WindowOperator.this.context.window = mergeResult;
                        this.val$mergeTriggerResult.f0 = WindowOperator.this.context.onMerge(mergedWindows);
                        for (Window m : mergedWindows) {
                            WindowOperator.this.context.window = m;
                            WindowOperator.this.context.clear();
                            WindowOperator.this.deleteCleanupTimer(m);
                        }
                        WindowOperator.this.getStateBackend().mergePartitionedStates(stateWindowResult, mergedStateWindows, WindowOperator.this.windowSerializer, WindowOperator.this.windowStateDescriptor);
                    }
                });
                if (this.isLate(actualWindow)) {
                    mergingWindows.retireWindow(actualWindow);
                    continue;
                }
                Window stateWindow = mergingWindows.getStateWindow(actualWindow);
                if (stateWindow == null) {
                    throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                }
                AppendingState<IN, ACC> windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element.getValue());
                this.context.key = key;
                this.context.window = actualWindow;
                TriggerResult triggerResult = this.context.onElement(element);
                TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, (TriggerResult)((Object)mergeTriggerResult.f0));
                if (combinedTriggerResult.isFire()) {
                    Object contents = windowState.get();
                    if (contents == null) continue;
                    this.fire(actualWindow, contents);
                }
                if (combinedTriggerResult.isPurge()) {
                    this.cleanup(actualWindow, windowState, mergingWindows);
                    continue;
                }
                this.registerCleanupTimer(actualWindow);
            }
        } else {
            for (Window window : elementWindows) {
                if (this.isLate(window)) continue;
                AppendingState<IN, ACC> windowState = this.getPartitionedState(window, this.windowSerializer, this.windowStateDescriptor);
                windowState.add(element.getValue());
                this.context.key = key;
                this.context.window = window;
                TriggerResult triggerResult = this.context.onElement(element);
                if (triggerResult.isFire()) {
                    Object contents = windowState.get();
                    if (contents == null) continue;
                    this.fire(window, contents);
                }
                if (triggerResult.isPurge()) {
                    this.cleanup(window, windowState, null);
                    continue;
                }
                this.registerCleanupTimer(window);
            }
        }
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        boolean fire;
        do {
            Timer<K, W> timer;
            if ((timer = this.watermarkTimersQueue.peek()) != null && timer.timestamp <= mark.getTimestamp()) {
                AppendingState<IN, ACC> windowState;
                fire = true;
                this.watermarkTimers.remove(timer);
                this.watermarkTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                MergingWindowSet mergingWindows = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindows = this.getMergingWindowSet();
                    W stateWindow = mergingWindows.getStateWindow(this.context.window);
                    if (stateWindow == null) continue;
                    windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                } else {
                    windowState = this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Object contents = windowState.get();
                if (contents == null) continue;
                TriggerResult triggerResult = this.context.onEventTime(timer.timestamp);
                if (triggerResult.isFire()) {
                    this.fire(this.context.window, contents);
                }
                if (!triggerResult.isPurge() && (!this.windowAssigner.isEventTime() || !this.isCleanupTime(this.context.window, timer.timestamp))) continue;
                this.cleanup(this.context.window, windowState, mergingWindows);
                continue;
            }
            fire = false;
        } while (fire);
        this.output.emitWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    @Override
    public void trigger(long time) throws Exception {
        boolean fire;
        this.processingTimeTimerFutures.remove(time);
        this.processingTimeTimerTimestamps.remove(time, this.processingTimeTimerTimestamps.count(time));
        do {
            Timer<K, W> timer;
            if ((timer = this.processingTimeTimersQueue.peek()) != null && timer.timestamp <= time) {
                AppendingState<IN, ACC> windowState;
                fire = true;
                this.processingTimeTimers.remove(timer);
                this.processingTimeTimersQueue.remove();
                this.context.key = timer.key;
                this.context.window = timer.window;
                this.setKeyContext(timer.key);
                MergingWindowSet mergingWindows = null;
                if (this.windowAssigner instanceof MergingWindowAssigner) {
                    mergingWindows = this.getMergingWindowSet();
                    W stateWindow = mergingWindows.getStateWindow(this.context.window);
                    if (stateWindow == null) continue;
                    windowState = this.getPartitionedState(stateWindow, this.windowSerializer, this.windowStateDescriptor);
                } else {
                    windowState = this.getPartitionedState(this.context.window, this.windowSerializer, this.windowStateDescriptor);
                }
                Object contents = windowState.get();
                if (contents == null) continue;
                TriggerResult triggerResult = this.context.onProcessingTime(timer.timestamp);
                if (triggerResult.isFire()) {
                    this.fire(this.context.window, contents);
                }
                if (!triggerResult.isPurge() && (this.windowAssigner.isEventTime() || !this.isCleanupTime(this.context.window, timer.timestamp))) continue;
                this.cleanup(this.context.window, windowState, mergingWindows);
                continue;
            }
            fire = false;
        } while (fire);
    }

    private void cleanup(W window, AppendingState<IN, ACC> windowState, MergingWindowSet<W> mergingWindows) throws Exception {
        windowState.clear();
        if (mergingWindows != null) {
            mergingWindows.retireWindow(window);
        }
        this.context.clear();
    }

    private void fire(W window, ACC contents) throws Exception {
        this.timestampedCollector.setAbsoluteTimestamp(((Window)window).maxTimestamp());
        ((InternalWindowFunction)this.userFunction).apply(this.context.key, this.context.window, contents, this.timestampedCollector);
    }

    protected MergingWindowSet<W> getMergingWindowSet() throws Exception {
        MergingWindowSet<W> mergingWindows = this.mergingWindowsByKey.get(this.getStateBackend().getCurrentKey());
        if (mergingWindows == null) {
            TupleSerializer tupleSerializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer});
            ListStateDescriptor mergeStateDescriptor = new ListStateDescriptor("merging-window-set", (TypeSerializer)tupleSerializer);
            ListState mergeState = (ListState)this.getStateBackend().getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)mergeStateDescriptor);
            mergingWindows = new MergingWindowSet((MergingWindowAssigner)this.windowAssigner, mergeState);
            mergeState.clear();
            this.mergingWindowsByKey.put(this.getStateBackend().getCurrentKey(), mergingWindows);
        }
        return mergingWindows;
    }

    protected boolean isLate(W window) {
        return this.windowAssigner.isEventTime() && this.cleanupTime(window) <= this.currentWatermark;
    }

    protected void registerCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (this.windowAssigner.isEventTime()) {
            this.context.registerEventTimeTimer(cleanupTime);
        } else {
            this.context.registerProcessingTimeTimer(cleanupTime);
        }
    }

    protected void deleteCleanupTimer(W window) {
        long cleanupTime = this.cleanupTime(window);
        if (this.windowAssigner.isEventTime()) {
            this.context.deleteEventTimeTimer(cleanupTime);
        } else {
            this.context.deleteProcessingTimeTimer(cleanupTime);
        }
    }

    private long cleanupTime(W window) {
        long cleanupTime = ((Window)window).maxTimestamp() + this.allowedLateness;
        return cleanupTime >= ((Window)window).maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    }

    protected final boolean isCleanupTime(W window, long time) {
        long cleanupTime = this.cleanupTime(window);
        return cleanupTime == time;
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        if (this.mergingWindowsByKey != null) {
            TupleSerializer tupleSerializer = new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer});
            ListStateDescriptor mergeStateDescriptor = new ListStateDescriptor("merging-window-set", (TypeSerializer)tupleSerializer);
            for (Map.Entry<K, MergingWindowSet<W>> key : this.mergingWindowsByKey.entrySet()) {
                this.setKeyContext(key.getKey());
                ListState mergeState = (ListState)this.getStateBackend().getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, (StateDescriptor)mergeStateDescriptor);
                mergeState.clear();
                key.getValue().persist(mergeState);
            }
        }
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        AbstractStateBackend.CheckpointStateOutputView out = this.getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
        this.snapshotTimers((DataOutputView)out);
        taskState.setOperatorState(out.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState taskState) throws Exception {
        super.restoreState(taskState);
        ClassLoader userClassloader = this.getUserCodeClassloader();
        StateHandle<?> inputState = taskState.getOperatorState();
        DataInputView in = (DataInputView)inputState.getState(userClassloader);
        this.restoreTimers(in);
    }

    private void restoreTimers(DataInputView in) throws IOException {
        long timestamp;
        int numWatermarkTimers = in.readInt();
        this.watermarkTimers = new HashSet<Timer<K, W>>(numWatermarkTimers);
        this.watermarkTimersQueue = new PriorityQueue(Math.max(numWatermarkTimers, 1));
        for (int i = 0; i < numWatermarkTimers; ++i) {
            Object key = this.keySerializer.deserialize(in);
            Window window = (Window)this.windowSerializer.deserialize(in);
            timestamp = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp, key, window);
            this.watermarkTimers.add(timer);
            this.watermarkTimersQueue.add(timer);
        }
        int numProcessingTimeTimers = in.readInt();
        this.processingTimeTimersQueue = new PriorityQueue(Math.max(numProcessingTimeTimers, 1));
        this.processingTimeTimers = new HashSet<Timer<K, W>>();
        for (int i = 0; i < numProcessingTimeTimers; ++i) {
            Object key = this.keySerializer.deserialize(in);
            Window window = (Window)this.windowSerializer.deserialize(in);
            long timestamp2 = in.readLong();
            Timer<Object, Window> timer = new Timer<Object, Window>(timestamp2, key, window);
            this.processingTimeTimersQueue.add(timer);
            this.processingTimeTimers.add(timer);
        }
        int numProcessingTimeTimerTimestamp = in.readInt();
        this.processingTimeTimerTimestamps = HashMultiset.create();
        for (int i = 0; i < numProcessingTimeTimerTimestamp; ++i) {
            timestamp = in.readLong();
            int count = in.readInt();
            this.processingTimeTimerTimestamps.add(timestamp, count);
        }
    }

    private void snapshotTimers(DataOutputView out) throws IOException {
        out.writeInt(this.watermarkTimersQueue.size());
        for (Timer<K, W> timer : this.watermarkTimersQueue) {
            this.keySerializer.serialize(timer.key, out);
            this.windowSerializer.serialize(timer.window, out);
            out.writeLong(timer.timestamp);
        }
        out.writeInt(this.processingTimeTimers.size());
        for (Timer<K, W> timer : this.processingTimeTimers) {
            this.keySerializer.serialize(timer.key, out);
            this.windowSerializer.serialize(timer.window, out);
            out.writeLong(timer.timestamp);
        }
        out.writeInt(this.processingTimeTimerTimestamps.entrySet().size());
        for (Multiset.Entry entry : this.processingTimeTimerTimestamps.entrySet()) {
            out.writeLong(((Long)entry.getElement()).longValue());
            out.writeInt(entry.getCount());
        }
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public StateDescriptor<? extends AppendingState<IN, ACC>, ?> getStateDescriptor() {
        return this.windowStateDescriptor;
    }

    protected static class Timer<K, W extends Window>
    implements Comparable<Timer<K, W>> {
        protected long timestamp;
        protected K key;
        protected W window;

        public Timer(long timestamp, K key, W window) {
            this.timestamp = timestamp;
            this.key = key;
            this.window = window;
        }

        @Override
        public int compareTo(Timer<K, W> o) {
            return Long.compare(this.timestamp, o.timestamp);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Timer timer = (Timer)o;
            return this.timestamp == timer.timestamp && this.key.equals(timer.key) && this.window.equals(timer.window);
        }

        public int hashCode() {
            int result = (int)(this.timestamp ^ this.timestamp >>> 32);
            result = 31 * result + this.key.hashCode();
            result = 31 * result + this.window.hashCode();
            return result;
        }

        public String toString() {
            return "Timer{timestamp=" + this.timestamp + ", key=" + this.key + ", window=" + this.window + '}';
        }
    }

    public static class Context
    implements Trigger.OnMergeContext {
        protected K key;
        protected W window;
        protected Collection<W> mergedWindows;
        final /* synthetic */ WindowOperator this$0;

        public Context(K key, W window) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
        }

        @Override
        public MetricGroup getMetricGroup() {
            return this.this$0.getMetricGroup();
        }

        @Override
        public long getCurrentWatermark() {
            return this.this$0.currentWatermark;
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState) {
            TypeInformation typeInfo;
            Objects.requireNonNull(stateType, "The state type class must not be null");
            try {
                typeInfo = TypeExtractor.getForClass(stateType);
            }
            catch (Exception e) {
                throw new RuntimeException("Cannot analyze type '" + stateType.getName() + "' from the class alone, due to generic type parameters. Please specify the TypeInformation directly.", e);
            }
            return this.getKeyValueState(name, typeInfo, defaultState);
        }

        @Override
        public <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState) {
            Objects.requireNonNull(name, "The name of the state must not be null");
            Objects.requireNonNull(stateType, "The state type information must not be null");
            ValueStateDescriptor stateDesc = new ValueStateDescriptor(name, stateType.createSerializer(this.this$0.getExecutionConfig()), defaultState);
            return (ValueState)this.getPartitionedState((StateDescriptor<S, ?>)stateDesc);
        }

        @Override
        public <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            try {
                return (S)this.this$0.getPartitionedState(this.window, this.this$0.windowSerializer, stateDescriptor);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not retrieve state", e);
            }
        }

        @Override
        public <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor) {
            if (this.mergedWindows != null && this.mergedWindows.size() > 0) {
                try {
                    this.this$0.getStateBackend().mergePartitionedStates(this.window, this.mergedWindows, this.this$0.windowSerializer, stateDescriptor);
                }
                catch (Exception e) {
                    throw new RuntimeException("Error while merging state.", e);
                }
            }
        }

        @Override
        public long getCurrentProcessingTime() {
            return this.this$0.getCurrentProcessingTime();
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.processingTimeTimers.add(timer)) {
                this.this$0.processingTimeTimersQueue.add(timer);
                if (this.this$0.processingTimeTimerTimestamps.add(time, 1) == 0) {
                    ScheduledFuture scheduledFuture = this.this$0.registerTimer(time, this.this$0);
                    this.this$0.processingTimeTimerFutures.put(time, scheduledFuture);
                }
            }
        }

        @Override
        public void registerEventTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.watermarkTimers.add(timer)) {
                this.this$0.watermarkTimersQueue.add(timer);
            }
        }

        @Override
        public void deleteProcessingTimeTimer(long time) {
            ScheduledFuture<?> triggerTaskFuture;
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.processingTimeTimers.remove(timer)) {
                this.this$0.processingTimeTimersQueue.remove(timer);
            }
            if (this.this$0.processingTimeTimerTimestamps.remove(time, 1) == 1 && (triggerTaskFuture = this.this$0.processingTimeTimerFutures.remove(timer.timestamp)) != null && !triggerTaskFuture.isDone()) {
                triggerTaskFuture.cancel(false);
            }
        }

        @Override
        public void deleteEventTimeTimer(long time) {
            Timer timer = new Timer(time, this.key, this.window);
            if (this.this$0.watermarkTimers.remove(timer)) {
                this.this$0.watermarkTimersQueue.remove(timer);
            }
        }

        public TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            return this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
        }

        public TriggerResult onProcessingTime(long time) throws Exception {
            return this.this$0.trigger.onProcessingTime(time, this.window, this);
        }

        public TriggerResult onEventTime(long time) throws Exception {
            return this.this$0.trigger.onEventTime(time, this.window, this);
        }

        public TriggerResult onMerge(Collection<W> mergedWindows) throws Exception {
            this.mergedWindows = mergedWindows;
            return this.this$0.trigger.onMerge(this.window, this);
        }

        public void clear() throws Exception {
            this.this$0.trigger.clear(this.window, this);
        }

        public String toString() {
            return "Context{key=" + this.key + ", window=" + this.window + '}';
        }
    }
}

