/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client;

import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.api.common.Program;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;

public class LocalExecutor
extends PlanExecutor {
    private static boolean DEFAULT_OVERWRITE = false;
    private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = -1;
    private final Object lock = new Object();
    private LocalFlinkMiniCluster flink;
    private Configuration configuration;
    private int taskManagerNumSlots = -1;
    private boolean defaultOverwriteFiles = DEFAULT_OVERWRITE;

    public LocalExecutor() {
        if (!ExecutionEnvironment.localExecutionIsAllowed()) {
            throw new InvalidProgramException("The LocalEnvironment cannot be used when submitting a program through a client.");
        }
    }

    public LocalExecutor(Configuration conf) {
        this();
        this.configuration = conf;
    }

    public boolean isDefaultOverwriteFiles() {
        return this.defaultOverwriteFiles;
    }

    public void setDefaultOverwriteFiles(boolean defaultOverwriteFiles) {
        this.defaultOverwriteFiles = defaultOverwriteFiles;
    }

    public void setTaskManagerNumSlots(int taskManagerNumSlots) {
        this.taskManagerNumSlots = taskManagerNumSlots;
    }

    public int getTaskManagerNumSlots() {
        return this.taskManagerNumSlots;
    }

    public static Configuration createConfiguration(LocalExecutor le) {
        Configuration configuration = new Configuration();
        configuration.setInteger("taskmanager.numberOfTaskSlots", le.getTaskManagerNumSlots());
        configuration.setBoolean("fs.overwrite-files", le.isDefaultOverwriteFiles());
        return configuration;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Configuration configuration;
            if (this.flink == null) {
                configuration = LocalExecutor.createConfiguration(this);
                if (this.configuration != null) {
                    configuration.addAll(this.configuration);
                }
            } else {
                throw new IllegalStateException("The local executor was already started.");
            }
            this.flink = new LocalFlinkMiniCluster(configuration, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.flink == null) {
                throw new IllegalStateException("The local executor was not started.");
            }
            this.flink.stop();
            this.flink = null;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobExecutionResult executePlan(Plan plan) throws Exception {
        if (plan == null) {
            throw new IllegalArgumentException("The plan may not be null.");
        }
        Object object = this.lock;
        synchronized (object) {
            JobExecutionResult jobExecutionResult;
            block11: {
                boolean shutDownAtEnd;
                if (this.flink == null) {
                    int maxParallelism;
                    shutDownAtEnd = true;
                    if (this.taskManagerNumSlots == -1 && (maxParallelism = plan.getMaximumParallelism()) > 0) {
                        this.taskManagerNumSlots = maxParallelism;
                    }
                    this.start();
                } else {
                    shutDownAtEnd = false;
                }
                try {
                    Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
                    OptimizedPlan op = pc.compile(plan);
                    JobGraphGenerator jgg = new JobGraphGenerator();
                    JobGraph jobGraph = jgg.compileJobGraph(op);
                    boolean sysoutPrint = this.isPrintingStatusDuringExecution();
                    SerializedJobExecutionResult result = this.flink.submitJobAndWait(jobGraph, sysoutPrint);
                    jobExecutionResult = result.toJobExecutionResult(ClassLoader.getSystemClassLoader());
                    if (!shutDownAtEnd) break block11;
                }
                catch (Throwable throwable) {
                    if (shutDownAtEnd) {
                        this.stop();
                    }
                    throw throwable;
                }
                this.stop();
            }
            return jobExecutionResult;
        }
    }

    public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
        Optimizer pc = new Optimizer(new DataStatistics(), LocalExecutor.createConfiguration(this));
        OptimizedPlan op = pc.compile(plan);
        PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
        return gen.getOptimizerPlanAsJSON(op);
    }

    public static JobExecutionResult execute(Program pa, String ... args) throws Exception {
        return LocalExecutor.execute(pa.getPlan(args));
    }

    public static JobExecutionResult execute(Plan plan) throws Exception {
        LocalExecutor exec = new LocalExecutor();
        try {
            exec.start();
            JobExecutionResult jobExecutionResult = exec.executePlan(plan);
            return jobExecutionResult;
        }
        finally {
            exec.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static String optimizerPlanAsJSON(Plan plan) throws Exception {
        LocalExecutor exec = new LocalExecutor();
        try {
            exec.start();
            Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
            OptimizedPlan op = pc.compile(plan);
            PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
            String string = gen.getOptimizerPlanAsJSON(op);
            return string;
        }
        finally {
            exec.stop();
        }
    }

    public static String getPlanAsJSON(Plan plan) {
        PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();
        List sinks = Optimizer.createPreOptimizedPlan((Plan)plan);
        return gen.getPactPlanAsJSON(sinks);
    }
}

