/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.optimizer.plantranslate;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.AggregatorRegistry;
import org.apache.flink.api.common.aggregators.AggregatorWithName;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.distributions.DataDistribution;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.dag.TempMode;
import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.optimizer.plan.IterationPlanNode;
import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
import org.apache.flink.optimizer.plan.NamedChannel;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.PlanNode;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SolutionSetPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
import org.apache.flink.optimizer.plan.WorksetPlanNode;
import org.apache.flink.optimizer.plantranslate.JsonMapper;
import org.apache.flink.optimizer.util.Utils;
import org.apache.flink.runtime.io.network.DataExchangeMode;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import org.apache.flink.runtime.iterative.task.IterationHeadTask;
import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
import org.apache.flink.runtime.iterative.task.IterationTailTask;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
import org.apache.flink.runtime.operators.DataSinkTask;
import org.apache.flink.runtime.operators.DataSourceTask;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
import org.apache.flink.runtime.operators.NoOpDriver;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.Visitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobGraphGenerator
implements Visitor<PlanNode> {
    private static final Logger LOG = LoggerFactory.getLogger(JobGraphGenerator.class);
    public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
    private static final boolean mergeIterationAuxTasks = GlobalConfiguration.loadConfiguration().getBoolean("compiler.merge-iteration-aux", false);
    private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null);
    private Map<PlanNode, JobVertex> vertices;
    private Map<PlanNode, TaskInChain> chainedTasks;
    private Map<IterationPlanNode, IterationDescriptor> iterations;
    private List<TaskInChain> chainedTasksInSequence;
    private List<JobVertex> auxVertices;
    private final int defaultMaxFan;
    private final float defaultSortSpillingThreshold;
    private final boolean useLargeRecordHandler;
    private int iterationIdEnumerator = 1;
    private IterationPlanNode currentIteration;
    private List<IterationPlanNode> iterationStack;
    private SlotSharingGroup sharingGroup;

    public JobGraphGenerator() {
        this.defaultMaxFan = (Integer)AlgorithmOptions.SPILLING_MAX_FAN.defaultValue();
        this.defaultSortSpillingThreshold = ((Float)AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue()).floatValue();
        this.useLargeRecordHandler = false;
    }

    public JobGraphGenerator(Configuration config) {
        this.defaultMaxFan = config.getInteger(AlgorithmOptions.SPILLING_MAX_FAN);
        this.defaultSortSpillingThreshold = config.getFloat(AlgorithmOptions.SORT_SPILLING_THRESHOLD);
        this.useLargeRecordHandler = config.getBoolean("taskmanager.runtime.large-record-handler", false);
    }

    public JobGraph compileJobGraph(OptimizedPlan program) {
        return this.compileJobGraph(program, null);
    }

    public JobGraph compileJobGraph(OptimizedPlan program, JobID jobId) {
        if (program == null) {
            throw new NullPointerException("Program is null, did you called ExecutionEnvironment.execute()");
        }
        if (jobId == null) {
            jobId = JobID.generate();
        }
        this.vertices = new HashMap<PlanNode, JobVertex>();
        this.chainedTasks = new HashMap<PlanNode, TaskInChain>();
        this.chainedTasksInSequence = new ArrayList<TaskInChain>();
        this.auxVertices = new ArrayList<JobVertex>();
        this.iterations = new HashMap<IterationPlanNode, IterationDescriptor>();
        this.iterationStack = new ArrayList<IterationPlanNode>();
        this.sharingGroup = new SlotSharingGroup();
        program.accept(this);
        if (this.currentIteration != null) {
            throw new CompilerException("The graph translation ended prematurely, leaving an unclosed iteration.");
        }
        for (IterationDescriptor iteration : this.iterations.values()) {
            if (iteration.getIterationNode() instanceof BulkIterationPlanNode) {
                this.finalizeBulkIteration(iteration);
                continue;
            }
            if (iteration.getIterationNode() instanceof WorksetIterationPlanNode) {
                this.finalizeWorksetIteration(iteration);
                continue;
            }
            throw new CompilerException();
        }
        for (TaskInChain tic : this.chainedTasksInSequence) {
            TaskConfig t = new TaskConfig(tic.getContainingVertex().getConfiguration());
            t.addChainedTask(tic.getChainedTask(), tic.getTaskConfig(), tic.getTaskName());
        }
        this.attachOperatorNamesAndDescriptions();
        JobGraph graph = new JobGraph(jobId, program.getJobName());
        try {
            graph.setExecutionConfig(program.getOriginalPlan().getExecutionConfig());
        }
        catch (IOException e) {
            throw new CompilerException("Could not serialize the ExecutionConfig.This indicates that non-serializable types (like custom serializers) were registered");
        }
        graph.setAllowQueuedScheduling(false);
        graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
        for (JobVertex vertex : this.vertices.values()) {
            vertex.setInputDependencyConstraint(program.getOriginalPlan().getExecutionConfig().getDefaultInputDependencyConstraint());
            graph.addVertex(vertex);
        }
        for (JobVertex vertex : this.auxVertices) {
            graph.addVertex(vertex);
            vertex.setSlotSharingGroup(this.sharingGroup);
        }
        Collection userArtifacts = program.getOriginalPlan().getCachedFiles().stream().map(entry -> Tuple2.of(entry.getKey(), entry.getValue())).collect(Collectors.toList());
        JobGraphGenerator.addUserArtifactEntries(userArtifacts, graph);
        this.vertices = null;
        this.chainedTasks = null;
        this.chainedTasksInSequence = null;
        this.auxVertices = null;
        this.iterations = null;
        this.iterationStack = null;
        return graph;
    }

    public static void addUserArtifactEntries(Collection<Tuple2<String, DistributedCache.DistributedCacheEntry>> userArtifacts, JobGraph jobGraph) {
        if (!userArtifacts.isEmpty()) {
            try {
                java.nio.file.Path tmpDir = Files.createTempDirectory("flink-distributed-cache-" + jobGraph.getJobID(), new FileAttribute[0]);
                for (Tuple2<String, DistributedCache.DistributedCacheEntry> originalEntry : userArtifacts) {
                    DistributedCache.DistributedCacheEntry entry;
                    Path filePath = new Path(((DistributedCache.DistributedCacheEntry)originalEntry.f1).filePath);
                    boolean isLocalDir = false;
                    try {
                        FileSystem sourceFs = filePath.getFileSystem();
                        isLocalDir = !sourceFs.isDistributedFS() && sourceFs.getFileStatus(filePath).isDir();
                    }
                    catch (IOException ioe) {
                        LOG.warn("Could not determine whether {} denotes a local path.", (Object)filePath, (Object)ioe);
                    }
                    if (isLocalDir) {
                        Path zip = FileUtils.compressDirectory((Path)filePath, (Path)new Path(tmpDir.toString(), filePath.getName() + ".zip"));
                        entry = new DistributedCache.DistributedCacheEntry(zip.toString(), ((DistributedCache.DistributedCacheEntry)originalEntry.f1).isExecutable.booleanValue(), true);
                    } else {
                        entry = new DistributedCache.DistributedCacheEntry(filePath.toString(), ((DistributedCache.DistributedCacheEntry)originalEntry.f1).isExecutable.booleanValue(), false);
                    }
                    jobGraph.addUserArtifact((String)originalEntry.f0, entry);
                }
            }
            catch (IOException ioe) {
                throw new FlinkRuntimeException("Could not compress distributed-cache artifacts.", (Throwable)ioe);
            }
        }
    }

    public boolean preVisit(PlanNode node) {
        IterationDescriptor descr;
        JobVertex vertex;
        block23: {
            if (this.vertices.containsKey(node) || this.chainedTasks.containsKey(node) || this.iterations.containsKey(node)) {
                return false;
            }
            try {
                Object iterationNode;
                if (node instanceof SinkPlanNode) {
                    vertex = this.createDataSinkVertex((SinkPlanNode)node);
                    break block23;
                }
                if (node instanceof SourcePlanNode) {
                    vertex = this.createDataSourceVertex((SourcePlanNode)node);
                    break block23;
                }
                if (node instanceof BulkIterationPlanNode) {
                    iterationNode = (BulkIterationPlanNode)node;
                    PlanNode root = ((BulkIterationPlanNode)iterationNode).getRootOfStepFunction();
                    if (root.getParallelism() != node.getParallelism()) {
                        throw new CompilerException("Error: The final operator of the step function has a different parallelism than the iteration operator itself.");
                    }
                    descr = new IterationDescriptor((IterationPlanNode)iterationNode, this.iterationIdEnumerator++);
                    this.iterations.put((IterationPlanNode)iterationNode, descr);
                    vertex = null;
                    break block23;
                }
                if (node instanceof WorksetIterationPlanNode) {
                    iterationNode = (WorksetIterationPlanNode)node;
                    PlanNode nextWorkSet = ((WorksetIterationPlanNode)iterationNode).getNextWorkSetPlanNode();
                    PlanNode solutionSetDelta = ((WorksetIterationPlanNode)iterationNode).getSolutionSetDeltaPlanNode();
                    if (nextWorkSet.getParallelism() != node.getParallelism()) {
                        throw new CompilerException("It is currently not supported that the final operator of the step function has a different parallelism than the iteration operator itself.");
                    }
                    if (solutionSetDelta.getParallelism() != node.getParallelism()) {
                        throw new CompilerException("It is currently not supported that the final operator of the step function has a different parallelism than the iteration operator itself.");
                    }
                    IterationDescriptor descr2 = new IterationDescriptor((IterationPlanNode)iterationNode, this.iterationIdEnumerator++);
                    this.iterations.put((IterationPlanNode)iterationNode, descr2);
                    vertex = null;
                    break block23;
                }
                if (node instanceof SingleInputPlanNode) {
                    vertex = this.createSingleInputVertex((SingleInputPlanNode)node);
                    break block23;
                }
                if (node instanceof DualInputPlanNode) {
                    vertex = this.createDualInputVertex((DualInputPlanNode)node);
                    break block23;
                }
                if (node instanceof NAryUnionPlanNode) {
                    vertex = null;
                    break block23;
                }
                if (node instanceof BulkPartialSolutionPlanNode) {
                    vertex = this.createBulkIterationHead((BulkPartialSolutionPlanNode)node);
                    break block23;
                }
                if (node instanceof SolutionSetPlanNode) {
                    for (Channel c : node.getOutgoingChannels()) {
                        int inputNum;
                        DualInputPlanNode target = (DualInputPlanNode)c.getTarget();
                        JobVertex accessingVertex = this.vertices.get(target);
                        TaskConfig conf = new TaskConfig(accessingVertex.getConfiguration());
                        int n = c == target.getInput1() ? 0 : (inputNum = c == target.getInput2() ? 1 : -1);
                        if (inputNum == -1) {
                            throw new CompilerException();
                        }
                        if (conf.getDriver().equals(JoinDriver.class)) {
                            conf.setDriver(inputNum == 0 ? JoinWithSolutionSetFirstDriver.class : JoinWithSolutionSetSecondDriver.class);
                            continue;
                        }
                        if (conf.getDriver().equals(CoGroupDriver.class)) {
                            conf.setDriver(inputNum == 0 ? CoGroupWithSolutionSetFirstDriver.class : CoGroupWithSolutionSetSecondDriver.class);
                            continue;
                        }
                        throw new CompilerException("Found join with solution set using incompatible operator (only Join/CoGroup are valid).");
                    }
                    this.chainedTasks.put(node, ALREADY_VISITED_PLACEHOLDER);
                    vertex = null;
                    break block23;
                }
                if (node instanceof WorksetPlanNode) {
                    vertex = this.createWorksetIterationHead((WorksetPlanNode)node);
                    break block23;
                }
                throw new CompilerException("Unrecognized node type: " + node.getClass().getName());
            }
            catch (Exception e) {
                throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e);
            }
        }
        if (vertex != null) {
            int pd = node.getParallelism();
            vertex.setParallelism(pd);
            vertex.setMaxParallelism(pd);
            vertex.setSlotSharingGroup(this.sharingGroup);
            if (this.currentIteration != null) {
                PlanNode iterationNode = (PlanNode)((Object)this.currentIteration);
                if (iterationNode.getParallelism() < pd) {
                    throw new CompilerException("Error: All functions that are part of an iteration must have the same, or a lower, parallelism than the iteration operator.");
                }
                descr = this.iterations.get(this.currentIteration);
                new TaskConfig(vertex.getConfiguration()).setIterationId(descr.getId());
            }
            this.vertices.put(node, vertex);
        }
        return true;
    }

    public void postVisit(PlanNode node) {
        try {
            Iterator<Channel> inConns;
            if (node instanceof SourcePlanNode || node instanceof NAryUnionPlanNode || node instanceof SolutionSetPlanNode) {
                return;
            }
            if (node instanceof IterationPlanNode) {
                if (node.isOnDynamicPath()) {
                    throw new CompilerException("Nested Iterations are not possible at the moment!");
                }
                if (this.currentIteration != null) {
                    this.iterationStack.add(this.currentIteration);
                }
                this.currentIteration = (IterationPlanNode)((Object)node);
                this.currentIteration.acceptForStepFunction(this);
                this.currentIteration = this.iterationStack.isEmpty() ? null : this.iterationStack.remove(this.iterationStack.size() - 1);
                if (node instanceof WorksetIterationPlanNode) {
                    WorksetIterationPlanNode wsNode = (WorksetIterationPlanNode)node;
                    JobVertex headVertex = this.iterations.get(wsNode).getHeadTask();
                    TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
                    int inputIndex = headConfig.getDriverStrategy().getNumInputs();
                    headConfig.setIterationHeadSolutionSetInputIndex(inputIndex);
                    this.translateChannel(wsNode.getInitialSolutionSetInput(), inputIndex, headVertex, headConfig, false);
                }
                return;
            }
            JobVertex targetVertex = this.vertices.get(node);
            if (targetVertex == null) {
                TaskInChain chainedTask = this.chainedTasks.get(node);
                if (chainedTask != null) {
                    Iterator<Channel> inConns2 = node.getInputs().iterator();
                    if (!inConns2.hasNext()) {
                        throw new CompilerException("Bug: Found chained task with no input.");
                    }
                    Channel inConn = inConns2.next();
                    if (inConns2.hasNext()) {
                        throw new CompilerException("Bug: Found a chained task with more than one input!");
                    }
                    if (inConn.getLocalStrategy() != null && inConn.getLocalStrategy() != LocalStrategy.NONE) {
                        throw new CompilerException("Bug: Found a chained task with an input local strategy.");
                    }
                    if (inConn.getShipStrategy() != null && inConn.getShipStrategy() != ShipStrategyType.FORWARD) {
                        throw new CompilerException("Bug: Found a chained task with an input ship strategy other than FORWARD.");
                    }
                    JobVertex container = chainedTask.getContainingVertex();
                    if (container == null) {
                        PlanNode sourceNode = inConn.getSource();
                        container = this.vertices.get(sourceNode);
                        if (container == null) {
                            container = this.chainedTasks.get(sourceNode).getContainingVertex();
                            if (container == null) {
                                throw new IllegalStateException("Bug: Chained task predecessor has not been assigned its containing vertex.");
                            }
                        } else {
                            new TaskConfig(container.getConfiguration()).addOutputShipStrategy(ShipStrategyType.FORWARD);
                        }
                        chainedTask.setContainingVertex(container);
                    }
                    chainedTask.getTaskConfig().setInputSerializer(inConn.getSerializer(), 0);
                    String containerTaskName = container.getName();
                    if (containerTaskName.startsWith("CHAIN ")) {
                        container.setName(containerTaskName + " -> " + chainedTask.getTaskName());
                    } else {
                        container.setName("CHAIN " + containerTaskName + " -> " + chainedTask.getTaskName());
                    }
                    container.setResources(container.getMinResources().merge(node.getMinResources()), container.getPreferredResources().merge(node.getPreferredResources()));
                    this.chainedTasksInSequence.add(chainedTask);
                    return;
                }
                if (node instanceof BulkPartialSolutionPlanNode || node instanceof WorksetPlanNode) {
                    return;
                }
                throw new CompilerException("Bug: Unrecognized merged task vertex.");
            }
            if (this.currentIteration != null) {
                JobVertex head = this.iterations.get(this.currentIteration).getHeadTask();
                if (node.isOnDynamicPath()) {
                    targetVertex.setStrictlyCoLocatedWith(head);
                }
            }
            TaskConfig targetVertexConfig = new TaskConfig(targetVertex.getConfiguration());
            if (node instanceof BulkPartialSolutionPlanNode) {
                inConns = ((BulkPartialSolutionPlanNode)node).getContainingIterationNode().getInputs().iterator();
                targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
            } else if (node instanceof WorksetPlanNode) {
                WorksetPlanNode wspn = (WorksetPlanNode)node;
                inConns = Collections.singleton(wspn.getContainingIterationNode().getInput2()).iterator();
                targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
                targetVertexConfig.setIterationHeadSolutionSetInputIndex(1);
            } else {
                inConns = node.getInputs().iterator();
            }
            if (!inConns.hasNext()) {
                throw new CompilerException("Bug: Found a non-source task with no input.");
            }
            int inputIndex = 0;
            while (inConns.hasNext()) {
                Channel input = inConns.next();
                inputIndex += this.translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false);
            }
            int broadcastInputIndex = 0;
            for (NamedChannel broadcastInput : node.getBroadcastInputs()) {
                int broadcastInputIndexDelta = this.translateChannel(broadcastInput, broadcastInputIndex, targetVertex, targetVertexConfig, true);
                targetVertexConfig.setBroadcastInputName(broadcastInput.getName(), broadcastInputIndex);
                targetVertexConfig.setBroadcastInputSerializer(broadcastInput.getSerializer(), broadcastInputIndex);
                broadcastInputIndex += broadcastInputIndexDelta;
            }
        }
        catch (Exception e) {
            throw new CompilerException("An error occurred while translating the optimized plan to a JobGraph: " + e.getMessage(), e);
        }
    }

    /*
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private int translateChannel(Channel input, int inputIndex, JobVertex targetVertex, TaskConfig targetVertexConfig, boolean isBroadcast) throws Exception {
        PlanNode iterationNode;
        Iterator<Channel> allInChannels;
        PlanNode inputPlanNode = input.getSource();
        if (inputPlanNode instanceof NAryUnionPlanNode) {
            allInChannels = ((NAryUnionPlanNode)inputPlanNode).getListOfInputs().iterator();
            for (Channel in : inputPlanNode.getInputs()) {
                if (input.getDataExchangeMode().equals((Object)DataExchangeMode.BATCH)) {
                    in.setDataExchangeMode(DataExchangeMode.BATCH);
                }
                if (!isBroadcast) continue;
                in.setShipStrategy(ShipStrategyType.BROADCAST, in.getDataExchangeMode());
            }
            if (input.getShipStrategy() != ShipStrategyType.FORWARD && !isBroadcast) {
                throw new CompilerException("Optimized plan contains Union with non-forward outgoing ship strategy.");
            }
        } else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
            if (this.vertices.get(inputPlanNode) == null) {
                BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode)inputPlanNode;
                iterationNode = pspn.getContainingIterationNode();
                allInChannels = ((SingleInputPlanNode)iterationNode).getInput().getSource() instanceof NAryUnionPlanNode ? ((SingleInputPlanNode)iterationNode).getInput().getSource().getInputs().iterator() : Collections.singletonList(((SingleInputPlanNode)iterationNode).getInput()).iterator();
                targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
            } else {
                allInChannels = Collections.singletonList(input).iterator();
            }
        } else if (inputPlanNode instanceof WorksetPlanNode) {
            if (this.vertices.get(inputPlanNode) == null) {
                WorksetPlanNode wspn = (WorksetPlanNode)inputPlanNode;
                iterationNode = wspn.getContainingIterationNode();
                allInChannels = ((DualInputPlanNode)iterationNode).getInput2().getSource() instanceof NAryUnionPlanNode ? ((DualInputPlanNode)iterationNode).getInput2().getSource().getInputs().iterator() : Collections.singletonList(((DualInputPlanNode)iterationNode).getInput2()).iterator();
                targetVertexConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(inputIndex);
            } else {
                allInChannels = Collections.singletonList(input).iterator();
            }
        } else {
            if (inputPlanNode instanceof SolutionSetPlanNode) {
                return 0;
            }
            allInChannels = Collections.singletonList(input).iterator();
        }
        TypeSerializerFactory<?> typeSerFact = null;
        int numChannelsTotal = 0;
        int numChannelsDynamicPath = 0;
        int numDynamicSenderTasksTotal = 0;
        while (allInChannels.hasNext()) {
            TaskConfig sourceVertexConfig;
            Channel inConn = allInChannels.next();
            if (typeSerFact == null) {
                typeSerFact = inConn.getSerializer();
            } else if (!typeSerFact.equals(inConn.getSerializer())) {
                throw new CompilerException("Conflicting types in union operator.");
            }
            PlanNode sourceNode = inConn.getSource();
            JobVertex sourceVertex = this.vertices.get(sourceNode);
            if (sourceVertex == null) {
                TaskInChain chainedTask = this.chainedTasks.get(sourceNode);
                if (chainedTask != null) {
                    if (chainedTask.getContainingVertex() == null) {
                        throw new IllegalStateException("Bug: Chained task has not been assigned its containing vertex when connecting.");
                    }
                    sourceVertex = chainedTask.getContainingVertex();
                    sourceVertexConfig = chainedTask.getTaskConfig();
                } else {
                    IterationDescriptor iteration = this.iterations.get(sourceNode);
                    if (iteration == null) throw new CompilerException("Bug: Could not resolve source node for a channel.");
                    sourceVertex = iteration.getHeadTask();
                    sourceVertexConfig = iteration.getHeadFinalResultConfig();
                }
            } else {
                sourceVertexConfig = new TaskConfig(sourceVertex.getConfiguration());
            }
            DistributionPattern pattern = this.connectJobVertices(inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
            ++numChannelsTotal;
            if (!inConn.isOnDynamicPath()) continue;
            ++numChannelsDynamicPath;
            numDynamicSenderTasksTotal += this.getNumberOfSendersPerReceiver(pattern, sourceVertex.getParallelism(), targetVertex.getParallelism());
        }
        if (numChannelsDynamicPath > 0 && numChannelsTotal != numChannelsDynamicPath) {
            throw new CompilerException("Error: It is currently not supported to union between dynamic and static path in an iteration.");
        }
        if (numDynamicSenderTasksTotal > 0) {
            if (isBroadcast) {
                targetVertexConfig.setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal);
            } else {
                targetVertexConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(inputIndex, numDynamicSenderTasksTotal);
            }
        }
        this.addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast);
        return 1;
    }

    private int getNumberOfSendersPerReceiver(DistributionPattern pattern, int numSenders, int numReceivers) {
        if (pattern == DistributionPattern.ALL_TO_ALL) {
            return numSenders;
        }
        if (pattern == DistributionPattern.POINTWISE) {
            if (numSenders != numReceivers) {
                if (numReceivers == 1) {
                    return numSenders;
                }
                if (numSenders == 1) {
                    return 1;
                }
                throw new CompilerException("Error: A changing parallelism is currently not supported between tasks within an iteration.");
            }
            return 1;
        }
        throw new CompilerException("Unknown distribution pattern for channels: " + pattern);
    }

    private JobVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException {
        TaskConfig config;
        JobVertex vertex;
        PlanNode wspn;
        boolean chaining;
        String taskName = node.getNodeName();
        DriverStrategy ds = node.getDriverStrategy();
        Channel inConn = node.getInput();
        PlanNode pred = inConn.getSource();
        boolean bl = chaining = ds.getPushChainDriverClass() != null && !(pred instanceof NAryUnionPlanNode) && !(pred instanceof BulkPartialSolutionPlanNode) && !(pred instanceof WorksetPlanNode) && !(pred instanceof IterationPlanNode) && inConn.getShipStrategy() == ShipStrategyType.FORWARD && inConn.getLocalStrategy() == LocalStrategy.NONE && pred.getOutgoingChannels().size() == 1 && node.getParallelism() == pred.getParallelism() && node.getBroadcastInputs().isEmpty();
        if (this.currentIteration instanceof WorksetIterationPlanNode && node.getOutgoingChannels().size() > 0 && (((WorksetIterationPlanNode)(wspn = (WorksetIterationPlanNode)this.currentIteration)).getSolutionSetDeltaPlanNode() == pred || ((WorksetIterationPlanNode)wspn).getNextWorkSetPlanNode() == pred)) {
            chaining = false;
        }
        if (this.currentIteration instanceof BulkIterationPlanNode) {
            wspn = (BulkIterationPlanNode)this.currentIteration;
            if (node == ((BulkIterationPlanNode)wspn).getRootOfTerminationCriterion() && ((BulkIterationPlanNode)wspn).getRootOfStepFunction() == pred) {
                chaining = false;
            } else if (node.getOutgoingChannels().size() > 0 && (((BulkIterationPlanNode)wspn).getRootOfStepFunction() == pred || ((BulkIterationPlanNode)wspn).getRootOfTerminationCriterion() == pred)) {
                chaining = false;
            }
        }
        if (chaining) {
            vertex = null;
            config = new TaskConfig(new Configuration());
            this.chainedTasks.put(node, new TaskInChain(node, ds.getPushChainDriverClass(), config, taskName));
        } else {
            vertex = new JobVertex(taskName);
            vertex.setResources(node.getMinResources(), node.getPreferredResources());
            vertex.setInvokableClass(this.currentIteration != null && node.isOnDynamicPath() ? IterationIntermediateTask.class : BatchTask.class);
            config = new TaskConfig(vertex.getConfiguration());
            config.setDriver(ds.getDriverClass());
        }
        config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
        config.setStubParameters(node.getProgramOperator().getParameters());
        config.setDriverStrategy(ds);
        for (int i = 0; i < ds.getNumRequiredComparators(); ++i) {
            config.setDriverComparator(node.getComparator(i), i);
        }
        this.assignDriverResources(node, config);
        return vertex;
    }

    private JobVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException {
        String taskName = node.getNodeName();
        DriverStrategy ds = node.getDriverStrategy();
        JobVertex vertex = new JobVertex(taskName);
        TaskConfig config = new TaskConfig(vertex.getConfiguration());
        vertex.setResources(node.getMinResources(), node.getPreferredResources());
        vertex.setInvokableClass(this.currentIteration != null && node.isOnDynamicPath() ? IterationIntermediateTask.class : BatchTask.class);
        config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
        config.setStubParameters(node.getProgramOperator().getParameters());
        config.setDriver(ds.getDriverClass());
        config.setDriverStrategy(ds);
        if (node.getComparator1() != null) {
            config.setDriverComparator(node.getComparator1(), 0);
        }
        if (node.getComparator2() != null) {
            config.setDriverComparator(node.getComparator2(), 1);
        }
        if (node.getPairComparator() != null) {
            config.setDriverPairComparator(node.getPairComparator());
        }
        this.assignDriverResources(node, config);
        return vertex;
    }

    private InputFormatVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException {
        InputFormatVertex vertex = new InputFormatVertex(node.getNodeName());
        TaskConfig config = new TaskConfig(vertex.getConfiguration());
        vertex.setResources(node.getMinResources(), node.getPreferredResources());
        vertex.setInvokableClass(DataSourceTask.class);
        vertex.setFormatDescription(this.getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
        config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
        config.setStubParameters(node.getProgramOperator().getParameters());
        config.setOutputSerializer(node.getSerializer());
        return vertex;
    }

    private JobVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException {
        OutputFormatVertex vertex = new OutputFormatVertex(node.getNodeName());
        TaskConfig config = new TaskConfig(vertex.getConfiguration());
        vertex.setResources(node.getMinResources(), node.getPreferredResources());
        vertex.setInvokableClass(DataSinkTask.class);
        vertex.setFormatDescription(this.getDescriptionForUserCode(node.getProgramOperator().getUserCodeWrapper()));
        config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
        config.setStubParameters(node.getProgramOperator().getParameters());
        return vertex;
    }

    private JobVertex createBulkIterationHead(BulkPartialSolutionPlanNode pspn) {
        JobVertex toReturn;
        TaskConfig headConfig;
        JobVertex headVertex;
        boolean merge;
        BulkIterationPlanNode iteration = pspn.getContainingIterationNode();
        if (mergeIterationAuxTasks && pspn.getOutgoingChannels().size() == 1) {
            Channel c = pspn.getOutgoingChannels().get(0);
            PlanNode successor = c.getTarget();
            merge = c.getShipStrategy() == ShipStrategyType.FORWARD && c.getLocalStrategy() == LocalStrategy.NONE && c.getTempMode() == TempMode.NONE && successor.getParallelism() == pspn.getParallelism() && !(successor instanceof NAryUnionPlanNode) && successor != iteration.getRootOfStepFunction() && iteration.getInput().getLocalStrategy() == LocalStrategy.NONE;
        } else {
            merge = false;
        }
        if (merge) {
            PlanNode successor = pspn.getOutgoingChannels().get(0).getTarget();
            headVertex = this.vertices.get(successor);
            if (headVertex == null) {
                throw new CompilerException("Bug: Trying to merge solution set with its successor, but successor has not been created.");
            }
            headVertex.setInvokableClass(IterationHeadTask.class);
            headConfig = new TaskConfig(headVertex.getConfiguration());
            toReturn = null;
        } else {
            headVertex = new JobVertex("PartialSolution (" + iteration.getNodeName() + ")");
            headVertex.setResources(iteration.getMinResources(), iteration.getPreferredResources());
            headVertex.setInvokableClass(IterationHeadTask.class);
            headConfig = new TaskConfig(headVertex.getConfiguration());
            headConfig.setDriver(NoOpDriver.class);
            toReturn = headVertex;
        }
        IterationDescriptor descr = this.iterations.get(iteration);
        if (descr == null) {
            throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node.");
        }
        descr.setHeadTask(headVertex, headConfig);
        return toReturn;
    }

    private JobVertex createWorksetIterationHead(WorksetPlanNode wspn) {
        JobVertex toReturn;
        TaskConfig headConfig;
        JobVertex headVertex;
        boolean merge;
        WorksetIterationPlanNode iteration = wspn.getContainingIterationNode();
        if (mergeIterationAuxTasks && wspn.getOutgoingChannels().size() == 1) {
            Channel c = wspn.getOutgoingChannels().get(0);
            PlanNode successor = c.getTarget();
            merge = c.getShipStrategy() == ShipStrategyType.FORWARD && c.getLocalStrategy() == LocalStrategy.NONE && c.getTempMode() == TempMode.NONE && successor.getParallelism() == wspn.getParallelism() && !(successor instanceof NAryUnionPlanNode) && successor != iteration.getNextWorkSetPlanNode() && iteration.getInitialWorksetInput().getLocalStrategy() == LocalStrategy.NONE;
        } else {
            merge = false;
        }
        if (merge) {
            PlanNode successor = wspn.getOutgoingChannels().get(0).getTarget();
            headVertex = this.vertices.get(successor);
            if (headVertex == null) {
                throw new CompilerException("Bug: Trying to merge solution set with its sucessor, but successor has not been created.");
            }
            headVertex.setInvokableClass(IterationHeadTask.class);
            headConfig = new TaskConfig(headVertex.getConfiguration());
            toReturn = null;
        } else {
            headVertex = new JobVertex("IterationHead(" + iteration.getNodeName() + ")");
            headVertex.setResources(iteration.getMinResources(), iteration.getPreferredResources());
            headVertex.setInvokableClass(IterationHeadTask.class);
            headConfig = new TaskConfig(headVertex.getConfiguration());
            headConfig.setDriver(NoOpDriver.class);
            toReturn = headVertex;
        }
        headConfig.setSolutionSetUnmanaged(iteration.getIterationNode().getIterationContract().isSolutionSetUnManaged());
        IterationDescriptor descr = this.iterations.get(iteration);
        if (descr == null) {
            throw new CompilerException("Bug: Iteration descriptor was not created at when translating the iteration node.");
        }
        descr.setHeadTask(headVertex, headConfig);
        return toReturn;
    }

    private void assignDriverResources(PlanNode node, TaskConfig config) {
        double relativeMem = node.getRelativeMemoryPerSubTask();
        if (relativeMem > 0.0) {
            config.setRelativeMemoryDriver(relativeMem);
            config.setFilehandlesDriver(this.defaultMaxFan);
            config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
        }
    }

    private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) {
        if (c.getRelativeMemoryLocalStrategy() > 0.0) {
            config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy());
            config.setFilehandlesInput(inputNum, this.defaultMaxFan);
            config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold);
            config.setUseLargeRecordHandler(this.useLargeRecordHandler);
        }
    }

    private DistributionPattern connectJobVertices(Channel channel, int inputNumber, JobVertex sourceVertex, TaskConfig sourceConfig, JobVertex targetVertex, TaskConfig targetConfig, boolean isBroadcast) throws CompilerException {
        String localStrategy;
        ResultPartitionType resultType;
        DistributionPattern distributionPattern;
        switch (channel.getShipStrategy()) {
            case FORWARD: {
                distributionPattern = DistributionPattern.POINTWISE;
                break;
            }
            case PARTITION_RANDOM: 
            case BROADCAST: 
            case PARTITION_HASH: 
            case PARTITION_CUSTOM: 
            case PARTITION_RANGE: 
            case PARTITION_FORCED_REBALANCE: {
                distributionPattern = DistributionPattern.ALL_TO_ALL;
                break;
            }
            default: {
                throw new RuntimeException("Unknown runtime ship strategy: " + channel.getShipStrategy());
            }
        }
        switch (channel.getDataExchangeMode()) {
            case PIPELINED: {
                resultType = ResultPartitionType.PIPELINED;
                break;
            }
            case BATCH: {
                resultType = channel.getSource().isOnDynamicPath() ? ResultPartitionType.PIPELINED : ResultPartitionType.BLOCKING;
                break;
            }
            case PIPELINE_WITH_BATCH_FALLBACK: {
                throw new UnsupportedOperationException("Data exchange mode " + channel.getDataExchangeMode() + " currently not supported.");
            }
            default: {
                throw new UnsupportedOperationException("Unknown data exchange mode.");
            }
        }
        JobEdge edge = targetVertex.connectNewDataSetAsInput(sourceVertex, distributionPattern, resultType);
        int outputIndex = sourceConfig.getNumOutputs();
        sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
        if (outputIndex == 0) {
            sourceConfig.setOutputSerializer(channel.getSerializer());
        }
        if (channel.getShipStrategyComparator() != null) {
            sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex);
        }
        if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) {
            DataDistribution dataDistribution = channel.getDataDistribution();
            if (dataDistribution != null) {
                sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
            } else {
                throw new RuntimeException("Range partitioning requires data distribution.");
            }
        }
        if (channel.getShipStrategy() == ShipStrategyType.PARTITION_CUSTOM) {
            if (channel.getPartitioner() != null) {
                sourceConfig.setOutputPartitioner(channel.getPartitioner(), outputIndex);
            } else {
                throw new CompilerException("The ship strategy was set to custom partitioning, but no partitioner was set.");
            }
        }
        if (isBroadcast) {
            targetConfig.addBroadcastInputToGroup(inputNumber);
        } else {
            targetConfig.addInputToGroup(inputNumber);
        }
        String shipStrategy = JsonMapper.getShipStrategyString(channel.getShipStrategy());
        if (channel.getShipStrategyKeys() != null && channel.getShipStrategyKeys().size() > 0) {
            shipStrategy = shipStrategy + " on " + (channel.getShipStrategySortOrder() == null ? channel.getShipStrategyKeys().toString() : Utils.createOrdering(channel.getShipStrategyKeys(), channel.getShipStrategySortOrder()).toString());
        }
        if (channel.getLocalStrategy() == null || channel.getLocalStrategy() == LocalStrategy.NONE) {
            localStrategy = null;
        } else {
            localStrategy = JsonMapper.getLocalStrategyString(channel.getLocalStrategy());
            if (localStrategy != null && channel.getLocalStrategyKeys() != null && channel.getLocalStrategyKeys().size() > 0) {
                localStrategy = localStrategy + " on " + (channel.getLocalStrategySortOrder() == null ? channel.getLocalStrategyKeys().toString() : Utils.createOrdering(channel.getLocalStrategyKeys(), channel.getLocalStrategySortOrder()).toString());
            }
        }
        String caching = channel.getTempMode() == TempMode.NONE ? null : channel.getTempMode().toString();
        edge.setShipStrategyName(shipStrategy);
        edge.setPreProcessingOperationName(localStrategy);
        edge.setOperatorLevelCachingDescription(caching);
        return distributionPattern;
    }

    private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) {
        if (isBroadcastChannel) {
            config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
            if (channel.getLocalStrategy() != LocalStrategy.NONE || channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE) {
                throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel.");
            }
            return;
        }
        config.setInputSerializer(channel.getSerializer(), inputNum);
        if (channel.getLocalStrategy() != LocalStrategy.NONE) {
            config.setInputLocalStrategy(inputNum, channel.getLocalStrategy());
            if (channel.getLocalStrategyComparator() != null) {
                config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
            }
        }
        this.assignLocalStrategyResources(channel, config, inputNum);
        if (channel.getTempMode() != null) {
            TempMode tm = channel.getTempMode();
            boolean needsMemory = false;
            if (tm.breaksPipeline() && (channel.isOnDynamicPath() || channel.getDataExchangeMode() != DataExchangeMode.BATCH)) {
                config.setInputAsynchronouslyMaterialized(inputNum, true);
                needsMemory = true;
            }
            if (tm.isCached()) {
                config.setInputCached(inputNum, true);
                needsMemory = true;
            }
            if (needsMemory) {
                if (tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0.0) {
                    throw new CompilerException("Bug in compiler: Inconsistent description of input materialization.");
                }
                config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory());
            }
        }
    }

    private void finalizeBulkIteration(IterationDescriptor descr) {
        TaskConfig tailConfig;
        BulkIterationPlanNode bulkNode = (BulkIterationPlanNode)descr.getIterationNode();
        JobVertex headVertex = descr.getHeadTask();
        TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
        TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
        int numStepFunctionOuts = headConfig.getNumOutputs();
        int numFinalOuts = headFinalOutputConfig.getNumOutputs();
        if (numStepFunctionOuts == 0) {
            throw new CompilerException("The iteration has no operation inside the step function.");
        }
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
        double relativeMemForBackChannel = bulkNode.getRelativeMemoryPerSubTask();
        if (relativeMemForBackChannel <= 0.0) {
            throw new CompilerException("Bug: No memory has been assigned to the iteration back channel.");
        }
        headConfig.setRelativeBackChannelMemory(relativeMemForBackChannel);
        JobVertex sync = new JobVertex("Sync (" + bulkNode.getNodeName() + ")");
        sync.setResources(bulkNode.getMinResources(), bulkNode.getPreferredResources());
        sync.setInvokableClass(IterationSynchronizationSinkTask.class);
        sync.setParallelism(1);
        sync.setMaxParallelism(1);
        this.auxVertices.add(sync);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
        int maxNumIterations = bulkNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
        if (maxNumIterations < 1) {
            throw new CompilerException("Cannot create bulk iteration with unspecified maximum number of iterations.");
        }
        syncConfig.setNumberOfIterations(maxNumIterations);
        sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        PlanNode rootOfTerminationCriterion = bulkNode.getRootOfTerminationCriterion();
        PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
        JobVertex rootOfStepFunctionVertex = this.vertices.get(rootOfStepFunction);
        if (rootOfStepFunctionVertex == null) {
            TaskInChain taskInChain = this.chainedTasks.get(rootOfStepFunction);
            if (taskInChain == null) {
                throw new CompilerException("Bug: Tail of step function not found as vertex or chained task.");
            }
            rootOfStepFunctionVertex = taskInChain.getContainingVertex();
            tailConfig = taskInChain.getTaskConfig();
        } else {
            tailConfig = new TaskConfig(rootOfStepFunctionVertex.getConfiguration());
        }
        tailConfig.setIsWorksetUpdate();
        if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
            rootOfStepFunctionVertex.setInvokableClass(IterationTailTask.class);
            tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
        }
        if (rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
            TaskConfig tailConfigOfTerminationCriterion;
            JobVertex rootOfTerminationCriterionVertex = this.vertices.get(rootOfTerminationCriterion);
            if (rootOfTerminationCriterionVertex == null) {
                TaskInChain taskInChain = this.chainedTasks.get(rootOfTerminationCriterion);
                if (taskInChain == null) {
                    throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
                }
                rootOfTerminationCriterionVertex = taskInChain.getContainingVertex();
                tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
            } else {
                tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
            }
            rootOfTerminationCriterionVertex.setInvokableClass(IterationTailTask.class);
            tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
            tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
            headConfig.setWaitForSolutionSetUpdate();
        }
        AggregatorRegistry aggs = bulkNode.getIterationNode().getIterationContract().getAggregators();
        Collection allAggregators = aggs.getAllRegisteredAggregators();
        headConfig.addIterationAggregators(allAggregators);
        syncConfig.addIterationAggregators(allAggregators);
        String convAggName = aggs.getConvergenceCriterionAggregatorName();
        ConvergenceCriterion convCriterion = aggs.getConvergenceCriterion();
        if (convCriterion != null || convAggName != null) {
            if (convCriterion == null) {
                throw new CompilerException("Error: Convergence criterion aggregator set, but criterion is null.");
            }
            if (convAggName == null) {
                throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator is null.");
            }
            syncConfig.setConvergenceCriterion(convAggName, convCriterion);
        }
    }

    private void finalizeWorksetIteration(IterationDescriptor descr) {
        TaskConfig solutionDeltaConfig;
        JobVertex solutionDeltaVertex;
        TaskConfig worksetTailConfig;
        TaskInChain taskInChain;
        WorksetIterationPlanNode iterNode = (WorksetIterationPlanNode)descr.getIterationNode();
        JobVertex headVertex = descr.getHeadTask();
        TaskConfig headConfig = new TaskConfig(headVertex.getConfiguration());
        TaskConfig headFinalOutputConfig = descr.getHeadFinalResultConfig();
        int numStepFunctionOuts = headConfig.getNumOutputs();
        int numFinalOuts = headFinalOutputConfig.getNumOutputs();
        if (numStepFunctionOuts == 0) {
            throw new CompilerException("The workset iteration has no operation on the workset inside the step function.");
        }
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutputConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(numStepFunctionOuts + numFinalOuts);
        double relativeMemory = iterNode.getRelativeMemoryPerSubTask();
        if (relativeMemory <= 0.0) {
            throw new CompilerException("Bug: No memory has been assigned to the workset iteration.");
        }
        headConfig.setIsWorksetIteration();
        headConfig.setRelativeBackChannelMemory(relativeMemory / 2.0);
        headConfig.setRelativeSolutionSetMemory(relativeMemory / 2.0);
        headConfig.setSolutionSetSerializer(iterNode.getSolutionSetSerializer());
        headConfig.setSolutionSetComparator(iterNode.getSolutionSetComparator());
        JobVertex sync = new JobVertex("Sync (" + iterNode.getNodeName() + ")");
        sync.setResources(iterNode.getMinResources(), iterNode.getPreferredResources());
        sync.setInvokableClass(IterationSynchronizationSinkTask.class);
        sync.setParallelism(1);
        sync.setMaxParallelism(1);
        this.auxVertices.add(sync);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, headVertex.getParallelism());
        int maxNumIterations = iterNode.getIterationNode().getIterationContract().getMaximumNumberOfIterations();
        if (maxNumIterations < 1) {
            throw new CompilerException("Cannot create workset iteration with unspecified maximum number of iterations.");
        }
        syncConfig.setNumberOfIterations(maxNumIterations);
        sync.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        PlanNode nextWorksetNode = iterNode.getNextWorkSetPlanNode();
        PlanNode solutionDeltaNode = iterNode.getSolutionSetDeltaPlanNode();
        boolean hasWorksetTail = nextWorksetNode.getOutgoingChannels().isEmpty();
        boolean hasSolutionSetTail = !iterNode.isImmediateSolutionSetUpdate() || !hasWorksetTail;
        JobVertex nextWorksetVertex = this.vertices.get(nextWorksetNode);
        if (nextWorksetVertex == null) {
            taskInChain = this.chainedTasks.get(nextWorksetNode);
            if (taskInChain == null) {
                throw new CompilerException("Bug: Next workset node not found as vertex or chained task.");
            }
            nextWorksetVertex = taskInChain.getContainingVertex();
            worksetTailConfig = taskInChain.getTaskConfig();
        } else {
            worksetTailConfig = new TaskConfig(nextWorksetVertex.getConfiguration());
        }
        worksetTailConfig.setIsWorksetIteration();
        worksetTailConfig.setIsWorksetUpdate();
        if (hasWorksetTail) {
            nextWorksetVertex.setInvokableClass(IterationTailTask.class);
            worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
        }
        if ((solutionDeltaVertex = this.vertices.get(solutionDeltaNode)) == null) {
            taskInChain = this.chainedTasks.get(solutionDeltaNode);
            if (taskInChain == null) {
                throw new CompilerException("Bug: Solution Set Delta not found as vertex or chained task.");
            }
            solutionDeltaVertex = taskInChain.getContainingVertex();
            solutionDeltaConfig = taskInChain.getTaskConfig();
        } else {
            solutionDeltaConfig = new TaskConfig(solutionDeltaVertex.getConfiguration());
        }
        solutionDeltaConfig.setIsWorksetIteration();
        solutionDeltaConfig.setIsSolutionSetUpdate();
        if (hasSolutionSetTail) {
            solutionDeltaVertex.setInvokableClass(IterationTailTask.class);
            solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
            headConfig.setWaitForSolutionSetUpdate();
        } else {
            if (!iterNode.isImmediateSolutionSetUpdate()) {
                throw new CompilerException("A solution set update without dedicated tail is not set to perform immediate updates.");
            }
            solutionDeltaConfig.setIsSolutionSetUpdateWithoutReprobe();
        }
        AggregatorRegistry aggs = iterNode.getIterationNode().getIterationContract().getAggregators();
        Collection allAggregators = aggs.getAllRegisteredAggregators();
        for (AggregatorWithName agg : allAggregators) {
            if (!agg.getName().equals("pact.runtime.workset-empty-aggregator")) continue;
            throw new CompilerException("User defined aggregator used the same name as built-in workset termination check aggregator: pact.runtime.workset-empty-aggregator");
        }
        headConfig.addIterationAggregators(allAggregators);
        syncConfig.addIterationAggregators(allAggregators);
        String convAggName = aggs.getConvergenceCriterionAggregatorName();
        ConvergenceCriterion convCriterion = aggs.getConvergenceCriterion();
        if (convCriterion != null || convAggName != null) {
            if (convCriterion == null) {
                throw new CompilerException("Error: Convergence criterion aggregator set, but criterion is null.");
            }
            if (convAggName == null) {
                throw new CompilerException("Error: Aggregator convergence criterion set, but aggregator is null.");
            }
            syncConfig.setConvergenceCriterion(convAggName, convCriterion);
        }
        headConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", (Aggregator)new LongSumAggregator());
        syncConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", (Aggregator)new LongSumAggregator());
        syncConfig.setImplicitConvergenceCriterion("pact.runtime.workset-empty-aggregator", (ConvergenceCriterion)new WorksetEmptyConvergenceCriterion());
    }

    private String getDescriptionForUserCode(UserCodeWrapper<?> wrapper) {
        try {
            if (wrapper.hasObject()) {
                try {
                    return wrapper.getUserCodeObject().toString();
                }
                catch (Throwable t) {
                    return wrapper.getUserCodeClass().getName();
                }
            }
            return wrapper.getUserCodeClass().getName();
        }
        catch (Throwable t) {
            return null;
        }
    }

    private void attachOperatorNamesAndDescriptions() {
        JobVertex vertex;
        JsonFactory jsonFactory = new JsonFactory();
        for (int i = this.chainedTasksInSequence.size() - 1; i >= 0; --i) {
            TaskInChain next = this.chainedTasksInSequence.get(i);
            PlanNode planNode = next.getPlanNode();
            vertex = next.getContainingVertex();
            String opName = planNode.getOptimizerNode().getOperatorName();
            if (vertex.getOperatorName() == null) {
                vertex.setOperatorName(opName);
            } else {
                vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
            }
            String opDescription = JsonMapper.getOperatorStrategyString(planNode.getDriverStrategy());
            if (vertex.getOperatorDescription() == null) {
                vertex.setOperatorDescription(opDescription);
            } else {
                vertex.setOperatorDescription(opDescription + "\n -> " + vertex.getOperatorDescription());
            }
            String prettyName = StringUtils.showControlCharacters((String)planNode.getNodeName());
            if (vertex.getOperatorPrettyName() == null) {
                vertex.setOperatorPrettyName(prettyName);
            } else {
                vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
            }
            if (vertex.getResultOptimizerProperties() != null) continue;
            String outputProps = JsonMapper.getOptimizerPropertiesJson(jsonFactory, planNode);
            vertex.setResultOptimizerProperties(outputProps);
        }
        for (Map.Entry<PlanNode, JobVertex> entry : this.vertices.entrySet()) {
            PlanNode node = entry.getKey();
            vertex = entry.getValue();
            String input1name = null;
            String input2name = null;
            int num = 0;
            for (Channel c : node.getInputs()) {
                if (num == 0) {
                    input1name = c.getSource().getNodeName();
                } else if (num == 1) {
                    input2name = c.getSource().getNodeName();
                }
                ++num;
            }
            String opName = node.getOptimizerNode().getOperatorName();
            if (vertex.getOperatorName() == null) {
                vertex.setOperatorName(opName);
            } else {
                vertex.setOperatorName(opName + " -> " + vertex.getOperatorName());
            }
            String opStrategy = JsonMapper.getOperatorStrategyString(node.getDriverStrategy(), input1name != null ? input1name : "(unnamed)", input2name != null ? input2name : "(unnamed)");
            if (vertex.getOperatorDescription() == null) {
                vertex.setOperatorDescription(opStrategy);
            } else {
                vertex.setOperatorDescription(opStrategy + "\n -> " + vertex.getOperatorDescription());
            }
            String prettyName = StringUtils.showControlCharacters((String)node.getNodeName());
            if (vertex.getOperatorPrettyName() == null) {
                vertex.setOperatorPrettyName(prettyName);
            } else {
                vertex.setOperatorPrettyName(prettyName + "\n -> " + vertex.getOperatorPrettyName());
            }
            if (vertex.getResultOptimizerProperties() != null) continue;
            vertex.setResultOptimizerProperties(JsonMapper.getOptimizerPropertiesJson(jsonFactory, node));
        }
    }

    private static final class IterationDescriptor {
        private final IterationPlanNode iterationNode;
        private JobVertex headTask;
        private TaskConfig headConfig;
        private TaskConfig headFinalResultConfig;
        private final int id;

        public IterationDescriptor(IterationPlanNode iterationNode, int id) {
            this.iterationNode = iterationNode;
            this.id = id;
        }

        public IterationPlanNode getIterationNode() {
            return this.iterationNode;
        }

        public void setHeadTask(JobVertex headTask, TaskConfig headConfig) {
            this.headTask = headTask;
            this.headFinalResultConfig = new TaskConfig(new Configuration());
            if (this.headConfig != null) {
                headConfig.getConfiguration().addAll(this.headConfig.getConfiguration());
            }
            this.headConfig = headConfig;
        }

        public JobVertex getHeadTask() {
            return this.headTask;
        }

        public TaskConfig getHeadFinalResultConfig() {
            return this.headFinalResultConfig;
        }

        public int getId() {
            return this.id;
        }
    }

    private static final class TaskInChain {
        private final Class<? extends ChainedDriver<?, ?>> chainedTask;
        private final TaskConfig taskConfig;
        private final String taskName;
        private final PlanNode planNode;
        private JobVertex containingVertex;

        TaskInChain(PlanNode planNode, Class<? extends ChainedDriver<?, ?>> chainedTask, TaskConfig taskConfig, String taskName) {
            this.planNode = planNode;
            this.chainedTask = chainedTask;
            this.taskConfig = taskConfig;
            this.taskName = taskName;
        }

        public PlanNode getPlanNode() {
            return this.planNode;
        }

        public Class<? extends ChainedDriver<?, ?>> getChainedTask() {
            return this.chainedTask;
        }

        public TaskConfig getTaskConfig() {
            return this.taskConfig;
        }

        public String getTaskName() {
            return this.taskName;
        }

        public JobVertex getContainingVertex() {
            return this.containingVertex;
        }

        public void setContainingVertex(JobVertex containingVertex) {
            this.containingVertex = containingVertex;
        }
    }
}

