/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.task.TaskSnapshot;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.cluster.scheduler.SchedulerRpcService;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class LocalSchedulerRpcService
implements SchedulerRpcService {
    private final Scheduler localScheduler;
    private static final Map<SchedulerRpcService.TaskOperation, Function<Task, Mono<Void>>> operationMapping = new HashMap<SchedulerRpcService.TaskOperation, Function<Task, Mono<Void>>>();
    private final Disposable disposable;

    public LocalSchedulerRpcService(Scheduler localScheduler, RpcServiceFactory serviceFactory) {
        this.localScheduler = localScheduler;
        this.disposable = serviceFactory.createConsumer("/rule-engine/cluster-scheduler:" + localScheduler.getId(), SchedulerRpcService.class, (Object)this);
    }

    public void shutdown() {
        this.disposable.dispose();
    }

    public Scheduler getLocalScheduler() {
        return this.localScheduler;
    }

    @Override
    public Flux<SchedulerRpcService.WorkerInfo> getWorkers() {
        return this.localScheduler.getWorkers().map(worker -> new SchedulerRpcService.WorkerInfo(worker.getId(), worker.getName()));
    }

    @Override
    public Mono<SchedulerRpcService.WorkerInfo> getWorker(String id) {
        return this.localScheduler.getWorker(id).map(worker -> new SchedulerRpcService.WorkerInfo(worker.getId(), worker.getName()));
    }

    @Override
    public Flux<SchedulerRpcService.TaskInfo> schedule(ScheduleJob job) {
        return this.localScheduler.schedule(job).map(task -> new SchedulerRpcService.TaskInfo(task.getId(), task.getName(), task.getWorkerId(), task.getJob()));
    }

    @Override
    public Mono<Void> shutdown(String instanceId) {
        return this.localScheduler.shutdown(instanceId);
    }

    @Override
    public Flux<SchedulerRpcService.TaskInfo> getSchedulingTask(String instanceId) {
        return this.localScheduler.getSchedulingTask(instanceId).map(task -> new SchedulerRpcService.TaskInfo(task.getId(), task.getName(), task.getWorkerId(), task.getJob()));
    }

    @Override
    public Flux<SchedulerRpcService.TaskInfo> getSchedulingTasks() {
        return this.localScheduler.getSchedulingTasks().map(task -> new SchedulerRpcService.TaskInfo(task.getId(), task.getName(), task.getWorkerId(), task.getJob()));
    }

    @Override
    public Mono<Long> totalTask() {
        return this.localScheduler.totalTask();
    }

    @Override
    public Mono<Boolean> canSchedule(ScheduleJob job) {
        return this.localScheduler.canSchedule(job);
    }

    @Override
    public Mono<Void> executeTask(String taskId, RuleData data) {
        return this.getTask(taskId).flatMap(task -> task.execute(data)).then();
    }

    private Mono<Task> getTask(String taskId) {
        return this.localScheduler.getSchedulingTasks().filter(task -> task.getId().equals(taskId)).singleOrEmpty();
    }

    @Override
    public Mono<Task.State> getTaskState(String taskId) {
        return this.getTask(taskId).flatMap(Task::getState);
    }

    @Override
    public Mono<Void> taskOperation(String taskId, SchedulerRpcService.TaskOperation operation) {
        return this.getTask(taskId).flatMap(task -> operationMapping.get((Object)operation).apply((Task)task));
    }

    @Override
    public Mono<Void> setTaskJob(String taskId, ScheduleJob job) {
        return this.getTask(taskId).flatMap(task -> task.setJob(job));
    }

    @Override
    public Mono<Long> getLastStateTime(String taskId) {
        return this.getTask(taskId).flatMap(Task::getLastStateTime);
    }

    @Override
    public Mono<Long> getStartTime(String taskId) {
        return this.getTask(taskId).flatMap(Task::getStartTime);
    }

    @Override
    public Mono<SchedulerRpcService.TaskInfo> createTask(String workerId, ScheduleJob job) {
        return this.localScheduler.getWorker(workerId).flatMap(worker -> worker.createTask(this.localScheduler.getId(), job)).map(task -> new SchedulerRpcService.TaskInfo(task.getId(), task.getName(), task.getWorkerId(), task.getJob()));
    }

    @Override
    public Mono<List<String>> getSupportExecutors(String workerId) {
        return this.localScheduler.getWorker(workerId).flatMap(Worker::getSupportExecutors);
    }

    @Override
    public Mono<Worker.State> getWorkerState(String workerId) {
        return this.localScheduler.getWorker(workerId).flatMap(Worker::getState);
    }

    @Override
    public Mono<Boolean> isAlive() {
        return Mono.just((Object)true);
    }

    @Override
    public Mono<TaskSnapshot> dumpTask(String taskId) {
        return this.getTask(taskId).flatMap(Task::dump);
    }

    static {
        operationMapping.put(SchedulerRpcService.TaskOperation.PAUSE, Task::pause);
        operationMapping.put(SchedulerRpcService.TaskOperation.START, Task::start);
        operationMapping.put(SchedulerRpcService.TaskOperation.SHUTDOWN, Task::shutdown);
        operationMapping.put(SchedulerRpcService.TaskOperation.RELOAD, Task::reload);
        operationMapping.put(SchedulerRpcService.TaskOperation.ENABLE_DEBUG, task -> task.debug(true));
        operationMapping.put(SchedulerRpcService.TaskOperation.DISABLE_DEBUG, task -> task.debug(false));
    }
}

