package org.apache.flink.runtime.taskmanager;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest.class */
public class TaskTest {
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private ActorGateway taskManagerGateway;
    private ActorGateway jobManagerGateway;
    private ActorGateway listenerGateway;
    private BlockingQueue<Object> taskManagerMessages;
    private BlockingQueue<Object> jobManagerMessages;
    private BlockingQueue<Object> listenerMessages;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingInInvoke.class */
    public static final class InvokableBlockingInInvoke extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            TaskTest.awaitLatch.trigger();
            synchronized (this) {
                wait();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableBlockingInRegisterInOut.class */
    public static final class InvokableBlockingInRegisterInOut extends AbstractInvokable {
        public void registerInputOutput() {
            TaskTest.awaitLatch.trigger();
            try {
                TaskTest.triggerLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException();
            }
        }

        public void invoke() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableNonInstantiable.class */
    public static abstract class InvokableNonInstantiable extends AbstractInvokable {
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithCancelTaskExceptionInInvoke.class */
    public static final class InvokableWithCancelTaskExceptionInInvoke extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            TaskTest.awaitLatch.trigger();
            try {
                TaskTest.triggerLatch.await();
            } catch (Throwable th) {
            }
            throw new CancelTaskException();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionInInvoke.class */
    public static final class InvokableWithExceptionInInvoke extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionInRegisterInOut.class */
    public static final class InvokableWithExceptionInRegisterInOut extends AbstractInvokable {
        public void registerInputOutput() {
            throw new RuntimeException("test");
        }

        public void invoke() {
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$InvokableWithExceptionOnTrigger.class */
    public static final class InvokableWithExceptionOnTrigger extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
            TaskTest.awaitLatch.trigger();
            while (true) {
                try {
                    TaskTest.triggerLatch.await();
                    throw new RuntimeException("test");
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskTest$TestInvokableCorrect.class */
    public static final class TestInvokableCorrect extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
        }

        public void cancel() throws Exception {
            Assert.fail("This should not be called");
        }
    }

    @Before
    public void createQueuesAndActors() {
        this.taskManagerMessages = new LinkedBlockingQueue();
        this.jobManagerMessages = new LinkedBlockingQueue();
        this.listenerMessages = new LinkedBlockingQueue();
        this.taskManagerGateway = new ForwardingActorGateway(this.taskManagerMessages);
        this.jobManagerGateway = new ForwardingActorGateway(this.jobManagerMessages);
        this.listenerGateway = new ForwardingActorGateway(this.listenerMessages);
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
    }

    @After
    public void clearActorsAndMessages() {
        this.jobManagerMessages = null;
        this.taskManagerMessages = null;
        this.listenerMessages = null;
        this.taskManagerGateway = null;
        this.jobManagerGateway = null;
        this.listenerGateway = null;
    }

    @Test
    public void testRegularExecution() {
        try {
            Task createTask = createTask(TestInvokableCorrect.class);
            Assert.assertEquals(ExecutionState.CREATED, createTask.getExecutionState());
            Assert.assertFalse(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FINISHED, createTask.getExecutionState());
            Assert.assertFalse(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateListenerMessage(ExecutionState.FINISHED, createTask, false);
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelRightAway() {
        try {
            Task createTask = createTask(TestInvokableCorrect.class);
            createTask.cancelExecution();
            Assert.assertEquals(ExecutionState.CANCELING, createTask.getExecutionState());
            createTask.run();
            Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
            validateUnregisterTask(createTask.getExecutionId());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyRightAway() {
        try {
            Task createTask = createTask(TestInvokableCorrect.class);
            createTask.failExternally(new Exception("fail externally"));
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            validateUnregisterTask(createTask.getExecutionId());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testLibraryCacheRegistrationFailed() {
        try {
            Task createTask = createTask(TestInvokableCorrect.class, (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class));
            Assert.assertEquals(ExecutionState.CREATED, createTask.getExecutionState());
            Assert.assertFalse(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertNotNull(createTask.getFailureCause());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("classloader"));
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
            validateUnregisterTask(createTask.getExecutionId());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInNetworkRegistration() {
        try {
            LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
            Mockito.when(libraryCacheManager.getClassLoader((JobID) Matchers.any(JobID.class))).thenReturn(getClass().getClassLoader());
            ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
            ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
            NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
            Mockito.when(networkEnvironment.getPartitionManager()).thenReturn(resultPartitionManager);
            Mockito.when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(resultPartitionConsumableNotifier);
            Mockito.when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
            ((NetworkEnvironment) Mockito.doThrow(new RuntimeException("buffers")).when(networkEnvironment)).registerTask((Task) Matchers.any(Task.class));
            Task createTask = createTask(TestInvokableCorrect.class, libraryCacheManager, networkEnvironment);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("buffers"));
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testInvokableInstantiationFailed() {
        try {
            Task createTask = createTask(InvokableNonInstantiable.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("instantiate"));
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInRegisterInputOutput() {
        try {
            Task createTask = createTask(InvokableWithExceptionInRegisterInOut.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("registerInputOutput"));
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInInvoke() {
        try {
            Task createTask = createTask(InvokableWithExceptionInInvoke.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("test"));
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelDuringRegisterInputOutput() {
        try {
            Task createTask = createTask(InvokableBlockingInRegisterInOut.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.cancelExecution();
            Assert.assertEquals(ExecutionState.CANCELING, createTask.getExecutionState());
            triggerLatch.trigger();
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            validateUnregisterTask(createTask.getExecutionId());
            validateCancelingAndCanceledListenerMessage(createTask);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailDuringRegisterInputOutput() {
        try {
            Task createTask = createTask(InvokableBlockingInRegisterInOut.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.failExternally(new Exception("test"));
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            triggerLatch.trigger();
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("test"));
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelDuringInvoke() {
        try {
            Task createTask = createTask(InvokableBlockingInInvoke.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.cancelExecution();
            Assert.assertTrue(createTask.getExecutionState() == ExecutionState.CANCELING || createTask.getExecutionState() == ExecutionState.CANCELED);
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateCancelingAndCanceledListenerMessage(createTask);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringInvoke() {
        try {
            Task createTask = createTask(InvokableBlockingInInvoke.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.failExternally(new Exception("test"));
            Assert.assertTrue(createTask.getExecutionState() == ExecutionState.FAILED);
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("test"));
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCanceledAfterExecutionFailedInRegInOut() {
        try {
            Task createTask = createTask(InvokableWithExceptionInRegisterInOut.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            createTask.cancelExecution();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("registerInputOutput"));
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() {
        try {
            Task createTask = createTask(InvokableWithExceptionInInvoke.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.run();
            createTask.cancelExecution();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("test"));
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testExecutionFailesAfterCanceling() {
        try {
            Task createTask = createTask(InvokableWithExceptionOnTrigger.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.cancelExecution();
            Assert.assertEquals(ExecutionState.CANCELING, createTask.getExecutionState());
            triggerLatch.trigger();
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertNull(createTask.getFailureCause());
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateCancelingAndCanceledListenerMessage(createTask);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() {
        try {
            Task createTask = createTask(InvokableWithExceptionOnTrigger.class);
            createTask.registerExecutionListener(this.listenerGateway);
            createTask.startTaskThread();
            awaitLatch.await();
            createTask.failExternally(new Exception("external"));
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            triggerLatch.trigger();
            createTask.getExecutingThread().join();
            Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
            Assert.assertTrue(createTask.isCanceledOrFailed());
            Assert.assertTrue(createTask.getFailureCause().getMessage().contains("external"));
            validateTaskManagerStateChange(ExecutionState.RUNNING, createTask, false);
            validateUnregisterTask(createTask.getExecutionId());
            validateListenerMessage(ExecutionState.RUNNING, createTask, false);
            validateListenerMessage(ExecutionState.FAILED, createTask, true);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task createTask = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        triggerLatch.trigger();
        createTask.run();
        Assert.assertEquals(ExecutionState.CANCELED, createTask.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task createTask = createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        createTask.startTaskThread();
        awaitLatch.await();
        createTask.failExternally(new Exception("external"));
        Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
        triggerLatch.trigger();
        createTask.getExecutingThread().join();
        Assert.assertEquals(ExecutionState.FAILED, createTask.getExecutionState());
        Assert.assertTrue(createTask.isCanceledOrFailed());
        Assert.assertTrue(createTask.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdate() throws Exception {
        IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
        ResultPartitionID resultPartitionID = new ResultPartitionID();
        SingleInputGate singleInputGate = (SingleInputGate) Mockito.mock(SingleInputGate.class);
        Mockito.when(singleInputGate.getConsumedResultId()).thenReturn(intermediateDataSetID);
        Task createTask = createTask(InvokableBlockingInInvoke.class);
        setInputGate(createTask, singleInputGate);
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(ExecutionState.values().length);
        for (ExecutionState executionState : ExecutionState.values()) {
            newHashMapWithExpectedSize.put(executionState, ExecutionState.FAILED);
        }
        newHashMapWithExpectedSize.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
        newHashMapWithExpectedSize.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        newHashMapWithExpectedSize.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        newHashMapWithExpectedSize.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        for (ExecutionState executionState2 : ExecutionState.values()) {
            setState(createTask, ExecutionState.RUNNING);
            createTask.onPartitionStateUpdate(intermediateDataSetID, resultPartitionID.getPartitionId(), executionState2);
            Assert.assertEquals(newHashMapWithExpectedSize.get(executionState2), createTask.getExecutionState());
        }
        ((SingleInputGate) Mockito.verify(singleInputGate, Mockito.times(1))).retriggerPartitionRequest((IntermediateResultPartitionID) Matchers.eq(resultPartitionID.getPartitionId()));
    }

    private void setInputGate(Task task, SingleInputGate singleInputGate) {
        try {
            Field declaredField = Task.class.getDeclaredField("inputGates");
            declaredField.setAccessible(true);
            declaredField.set(task, new SingleInputGate[]{singleInputGate});
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(1);
            newHashMapWithExpectedSize.put(singleInputGate.getConsumedResultId(), singleInputGate);
            Field declaredField2 = Task.class.getDeclaredField("inputGatesById");
            declaredField2.setAccessible(true);
            declaredField2.set(task, newHashMapWithExpectedSize);
        } catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private void setState(Task task, ExecutionState executionState) {
        try {
            Field declaredField = Task.class.getDeclaredField("executionState");
            declaredField.setAccessible(true);
            declaredField.set(task, executionState);
        } catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> cls) {
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Matchers.any(JobID.class))).thenReturn(getClass().getClassLoader());
        return createTask(cls, libraryCacheManager);
    }

    private Task createTask(Class<? extends AbstractInvokable> cls, LibraryCacheManager libraryCacheManager) {
        ResultPartitionManager resultPartitionManager = (ResultPartitionManager) Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.getPartitionManager()).thenReturn(resultPartitionManager);
        Mockito.when(networkEnvironment.getPartitionConsumableNotifier()).thenReturn(resultPartitionConsumableNotifier);
        Mockito.when(networkEnvironment.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
        return createTask(cls, libraryCacheManager, networkEnvironment);
    }

    private Task createTask(Class<? extends AbstractInvokable> cls, LibraryCacheManager libraryCacheManager, NetworkEnvironment networkEnvironment) {
        return new Task(createTaskDeploymentDescriptor(cls), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), this.taskManagerGateway, this.jobManagerGateway, new FiniteDuration(60L, TimeUnit.SECONDS), libraryCacheManager, (FileCache) Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration()));
    }

    private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> cls) {
        return new TaskDeploymentDescriptor(new JobID(), new JobVertexID(), new ExecutionAttemptID(), "Test Task", 0, 1, new Configuration(), new Configuration(), cls.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
    }

    private void validateUnregisterTask(ExecutionAttemptID executionAttemptID) {
        try {
            Object take = this.taskManagerMessages.take();
            Assert.assertNotNull("There is no additional TaskManager message", take);
            if (!(take instanceof TaskMessages.TaskInFinalState)) {
                Assert.fail("TaskManager message is not 'UnregisterTask', but " + take.getClass());
            }
            Assert.assertEquals(executionAttemptID, ((TaskMessages.TaskInFinalState) take).executionID());
        } catch (InterruptedException e) {
            Assert.fail("interrupted");
        }
    }

    private void validateTaskManagerStateChange(ExecutionState executionState, Task task, boolean z) {
        try {
            Object take = this.taskManagerMessages.take();
            Assert.assertNotNull("There is no additional TaskManager message", take);
            if (!(take instanceof TaskMessages.UpdateTaskExecutionState)) {
                Assert.fail("TaskManager message is not 'UpdateTaskExecutionState', but " + take.getClass());
            }
            TaskExecutionState taskExecutionState = ((TaskMessages.UpdateTaskExecutionState) take).taskExecutionState();
            Assert.assertEquals(task.getJobID(), taskExecutionState.getJobID());
            Assert.assertEquals(task.getExecutionId(), taskExecutionState.getID());
            Assert.assertEquals(executionState, taskExecutionState.getExecutionState());
            if (z) {
                Assert.assertNotNull(taskExecutionState.getError(getClass().getClassLoader()));
            } else {
                Assert.assertNull(taskExecutionState.getError(getClass().getClassLoader()));
            }
        } catch (InterruptedException e) {
            Assert.fail("interrupted");
        }
    }

    private void validateListenerMessage(ExecutionState executionState, Task task, boolean z) {
        try {
            TaskMessages.UpdateTaskExecutionState updateTaskExecutionState = (TaskMessages.UpdateTaskExecutionState) this.listenerMessages.take();
            Assert.assertNotNull("There is no additional listener message", updateTaskExecutionState);
            TaskExecutionState taskExecutionState = updateTaskExecutionState.taskExecutionState();
            Assert.assertEquals(task.getJobID(), taskExecutionState.getJobID());
            Assert.assertEquals(task.getExecutionId(), taskExecutionState.getID());
            Assert.assertEquals(executionState, taskExecutionState.getExecutionState());
            if (z) {
                Assert.assertNotNull(taskExecutionState.getError(getClass().getClassLoader()));
            } else {
                Assert.assertNull(taskExecutionState.getError(getClass().getClassLoader()));
            }
        } catch (InterruptedException e) {
            Assert.fail("interrupted");
        }
    }

    private void validateCancelingAndCanceledListenerMessage(Task task) {
        try {
            TaskMessages.UpdateTaskExecutionState updateTaskExecutionState = (TaskMessages.UpdateTaskExecutionState) this.listenerMessages.take();
            TaskMessages.UpdateTaskExecutionState updateTaskExecutionState2 = (TaskMessages.UpdateTaskExecutionState) this.listenerMessages.take();
            Assert.assertNotNull("There is no additional listener message", updateTaskExecutionState);
            Assert.assertNotNull("There is no additional listener message", updateTaskExecutionState2);
            TaskExecutionState taskExecutionState = updateTaskExecutionState.taskExecutionState();
            TaskExecutionState taskExecutionState2 = updateTaskExecutionState2.taskExecutionState();
            Assert.assertEquals(task.getJobID(), taskExecutionState.getJobID());
            Assert.assertEquals(task.getJobID(), taskExecutionState2.getJobID());
            Assert.assertEquals(task.getExecutionId(), taskExecutionState.getID());
            Assert.assertEquals(task.getExecutionId(), taskExecutionState2.getID());
            ExecutionState executionState = taskExecutionState.getExecutionState();
            ExecutionState executionState2 = taskExecutionState2.getExecutionState();
            Assert.assertTrue((executionState == ExecutionState.CANCELING && executionState2 == ExecutionState.CANCELED) || (executionState2 == ExecutionState.CANCELING && executionState == ExecutionState.CANCELED));
        } catch (InterruptedException e) {
            Assert.fail("interrupted");
        }
    }
}
