/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.client.program;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.List;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.ExecutionEnvironmentFactory;
import org.apache.flink.api.java.operators.translation.JavaPlan;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.CostEstimator;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

public class Client {
    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
    private final Configuration configuration;
    private final InetSocketAddress jobManagerAddress;
    private final Optimizer compiler;
    private final ClassLoader userCodeClassLoader;
    private boolean printStatusDuringExecution = true;
    private int maxSlots = -1;
    private JobID lastJobId = null;

    public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader, int maxSlots) throws UnknownHostException {
        Preconditions.checkNotNull(jobManagerAddress, "JobManager address is null");
        Preconditions.checkNotNull(config, "Configuration is null");
        Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
        this.configuration = config;
        if (jobManagerAddress.isUnresolved()) {
            String host = jobManagerAddress.getHostName();
            if (host == null) {
                throw new IllegalArgumentException("Host in jobManagerAddress is null");
            }
            try {
                InetAddress address = InetAddress.getByName(host);
                this.jobManagerAddress = new InetSocketAddress(address, jobManagerAddress.getPort());
            }
            catch (UnknownHostException e) {
                throw new UnknownHostException("Cannot resolve JobManager host name '" + host + "'.");
            }
        } else {
            this.jobManagerAddress = jobManagerAddress;
        }
        this.compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), this.configuration);
        this.userCodeClassLoader = userCodeClassLoader;
        this.maxSlots = maxSlots;
    }

    public Client(Configuration config, ClassLoader userCodeClassLoader) throws UnknownHostException {
        Preconditions.checkNotNull(config, "Configuration is null");
        Preconditions.checkNotNull(userCodeClassLoader, "User code ClassLoader is null");
        this.configuration = config;
        this.userCodeClassLoader = userCodeClassLoader;
        String address = config.getString("jobmanager.rpc.address", null);
        if (address == null) {
            throw new IllegalConfigurationException("Cannot find address to job manager's RPC service in the global configuration.");
        }
        int port = config.getInteger("jobmanager.rpc.port", 6123);
        if (port < 0) {
            throw new IllegalConfigurationException("Cannot find port to job manager's RPC service in the global configuration.");
        }
        try {
            InetAddress inetAddress = InetAddress.getByName(address);
            this.jobManagerAddress = new InetSocketAddress(inetAddress, port);
        }
        catch (UnknownHostException e) {
            throw new UnknownHostException("Cannot resolve the JobManager hostname '" + address + "' specified in the configuration");
        }
        this.compiler = new Optimizer(new DataStatistics(), (CostEstimator)new DefaultCostEstimator(), this.configuration);
    }

    public void setPrintStatusDuringExecution(boolean print) {
        this.printStatusDuringExecution = print;
    }

    public int getMaxSlots() {
        return this.maxSlots;
    }

    public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
        return jsonGen.getOptimizerPlanAsJSON((OptimizedPlan)this.getOptimizedPlan(prog, parallelism));
    }

    public FlinkPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.getOptimizedPlan(prog.getPlanWithJars(), parallelism);
        }
        if (prog.isUsingInteractiveMode()) {
            OptimizerPlanEnvironment env = new OptimizerPlanEnvironment(this.compiler);
            if (parallelism > 0) {
                env.setParallelism(parallelism);
            }
            env.setAsContext();
            PrintStream originalOut = System.out;
            PrintStream originalErr = System.err;
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            System.setOut(new PrintStream(baos));
            ByteArrayOutputStream baes = new ByteArrayOutputStream();
            System.setErr(new PrintStream(baes));
            try {
                ContextEnvironment.enableLocalExecution(false);
                prog.invokeInteractiveModeForExecution();
            }
            catch (ProgramInvocationException e) {
                throw e;
            }
            catch (Throwable t) {
                if (env.optimizerPlan != null) {
                    FlinkPlan flinkPlan = env.optimizerPlan;
                    return flinkPlan;
                }
                throw new ProgramInvocationException("The program caused an error: ", t);
            }
            finally {
                ContextEnvironment.enableLocalExecution(true);
                System.setOut(originalOut);
                System.setErr(originalErr);
                System.err.println(baes);
                System.out.println(baos);
            }
            throw new ProgramInvocationException("The program plan could not be fetched - the program aborted pre-maturely.\nSystem.err: " + baes.toString() + '\n' + "System.out: " + baos.toString() + '\n');
        }
        throw new RuntimeException();
    }

    public FlinkPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException {
        if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
            LOG.debug("Changing plan default parallelism from {} to {}", (Object)p.getDefaultParallelism(), (Object)parallelism);
            p.setDefaultParallelism(parallelism);
        }
        LOG.debug("Set parallelism {}, plan default parallelism {}", (Object)parallelism, (Object)p.getDefaultParallelism());
        return this.compiler.compile(p);
    }

    public FlinkPlan getOptimizedPlan(JobWithJars prog, int parallelism) throws CompilerException, ProgramInvocationException {
        return this.getOptimizedPlan(prog.getPlan(), parallelism);
    }

    public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
        return this.getJobGraph(optPlan, prog.getAllLibraries());
    }

    private JobGraph getJobGraph(FlinkPlan optPlan, List<File> jarFiles) {
        JobGraph job;
        if (optPlan instanceof StreamingPlan) {
            job = ((StreamingPlan)optPlan).getJobGraph();
        } else {
            JobGraphGenerator gen = new JobGraphGenerator();
            job = gen.compileJobGraph((OptimizedPlan)optPlan);
        }
        for (File jar : jarFiles) {
            job.addJar(new Path(jar.getAbsolutePath()));
        }
        return job;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public JobSubmissionResult run(PackagedProgram prog, int parallelism, boolean wait) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
        if (prog.isUsingProgramEntryPoint()) {
            return this.run(prog.getPlanWithJars(), parallelism, wait);
        }
        if (prog.isUsingInteractiveMode()) {
            LOG.info("Starting program in interactive mode");
            ContextEnvironment.setAsContext(this, prog.getAllLibraries(), prog.getUserCodeClassLoader(), parallelism, wait);
            ContextEnvironment.enableLocalExecution(false);
            try {
                prog.invokeInteractiveModeForExecution();
            }
            finally {
                ContextEnvironment.enableLocalExecution(true);
            }
            return new JobSubmissionResult(this.lastJobId);
        }
        throw new RuntimeException();
    }

    public JobSubmissionResult run(PackagedProgram prog, OptimizedPlan optimizedPlan, boolean wait) throws ProgramInvocationException {
        return this.run(optimizedPlan, prog.getAllLibraries(), wait);
    }

    public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) throws CompilerException, ProgramInvocationException {
        return this.run((OptimizedPlan)this.getOptimizedPlan(prog, parallelism), prog.getJarFiles(), wait);
    }

    public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
        JobGraph job = this.getJobGraph((FlinkPlan)compiledPlan, libraries);
        this.lastJobId = job.getJobID();
        return this.run(job, wait);
    }

    public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException {
        ActorRef jobManager;
        ActorSystem actorSystem;
        this.lastJobId = jobGraph.getJobID();
        LOG.info("JobManager actor system address is " + this.jobManagerAddress);
        LOG.info("Starting client actor system");
        try {
            actorSystem = JobClient.startJobClientActorSystem((Configuration)this.configuration);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Could start client actor system.", e);
        }
        LOG.info("Looking up JobManager");
        try {
            jobManager = JobManager.getJobManagerRemoteReference((InetSocketAddress)this.jobManagerAddress, (ActorSystem)actorSystem, (Configuration)this.configuration);
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Failed to resolve JobManager", e);
        }
        LOG.info("JobManager runs at " + jobManager.path());
        FiniteDuration timeout = AkkaUtils.getTimeout((Configuration)this.configuration);
        LOG.info("Communication between client and JobManager will have a timeout of " + timeout);
        LOG.info("Checking and uploading JAR files");
        try {
            JobClient.uploadJarFiles((JobGraph)jobGraph, (ActorRef)jobManager, (FiniteDuration)timeout);
        }
        catch (IOException e) {
            throw new ProgramInvocationException("Could not upload the program's JAR files to the JobManager.", e);
        }
        try {
            if (wait) {
                SerializedJobExecutionResult result = JobClient.submitJobAndWait((ActorSystem)actorSystem, (ActorRef)jobManager, (JobGraph)jobGraph, (FiniteDuration)timeout, (boolean)this.printStatusDuringExecution);
                try {
                    JobExecutionResult jobExecutionResult = result.toJobExecutionResult(this.userCodeClassLoader);
                    return jobExecutionResult;
                }
                catch (Exception e) {
                    throw new ProgramInvocationException("Failed to deserialize the accumulator result after the job execution", e);
                }
            }
            JobClient.submitJobDetached((ActorRef)jobManager, (JobGraph)jobGraph, (FiniteDuration)timeout);
        }
        catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Exception during program execution.", e);
        }
        finally {
            actorSystem.shutdown();
            actorSystem.awaitTermination();
        }
        JobSubmissionResult result = new JobSubmissionResult(jobGraph.getJobID());
        return result;
    }

    public static final class ProgramAbortException
    extends Error {
        private static final long serialVersionUID = 1L;
    }

    public static final class OptimizerPlanEnvironment
    extends ExecutionEnvironment {
        private final Optimizer compiler;
        private FlinkPlan optimizerPlan;

        private OptimizerPlanEnvironment(Optimizer compiler) {
            this.compiler = compiler;
        }

        public JobExecutionResult execute(String jobName) throws Exception {
            JavaPlan plan = this.createProgramPlan(jobName);
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        public String getExecutionPlan() throws Exception {
            JavaPlan plan = this.createProgramPlan(null, false);
            this.optimizerPlan = this.compiler.compile((Plan)plan);
            throw new ProgramAbortException();
        }

        private void setAsContext() {
            ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory(){

                public ExecutionEnvironment createExecutionEnvironment() {
                    return OptimizerPlanEnvironment.this;
                }
            };
            OptimizerPlanEnvironment.initializeContextEnvironment((ExecutionEnvironmentFactory)factory);
        }

        public void setPlan(FlinkPlan plan) {
            this.optimizerPlan = plan;
        }
    }
}

