/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.rule.engine.cluster.scheduler;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.jetlinks.core.rpc.RpcServiceFactory;
import org.jetlinks.rule.engine.api.scheduler.ScheduleJob;
import org.jetlinks.rule.engine.api.scheduler.Scheduler;
import org.jetlinks.rule.engine.api.task.Task;
import org.jetlinks.rule.engine.api.worker.Worker;
import org.jetlinks.rule.engine.api.worker.WorkerSelector;
import org.jetlinks.rule.engine.cluster.scheduler.LocalSchedulerRpcService;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class ClusterLocalScheduler
implements Scheduler {
    private final String id;
    static final WorkerSelector defaultSelector = (workers1, rule) -> workers1.take(1L);
    private WorkerSelector workerSelector = defaultSelector;
    private final List<Disposable> disposables = new CopyOnWriteArrayList<Disposable>();
    private final Set<Worker> localWorkers = new ConcurrentSkipListSet<Worker>(Comparator.comparing(Worker::getId));
    private final Map<String, Map<String, List<Task>>> localTasks = new ConcurrentHashMap<String, Map<String, List<Task>>>();
    private final LocalSchedulerRpcService rpcService;

    public ClusterLocalScheduler(String id, RpcServiceFactory serviceFactory) {
        this.id = id;
        this.rpcService = new LocalSchedulerRpcService(this, serviceFactory);
    }

    public void cleanup() {
        this.disposables.forEach(Disposable::dispose);
        this.disposables.clear();
        this.rpcService.shutdown();
    }

    public void addWorker(Worker worker) {
        this.localWorkers.add(this.wrapLocalWorker(worker));
    }

    private Worker wrapLocalWorker(Worker localWorker) {
        return localWorker;
    }

    public Flux<Worker> getWorkers() {
        return Flux.just(this.localWorkers).flatMapIterable(Function.identity());
    }

    public Mono<Worker> getWorker(String workerId) {
        return this.getWorkers().filter(worker -> worker.getId().equals(workerId)).take(1L).singleOrEmpty();
    }

    public Flux<Task> schedule(ScheduleJob job) {
        List<Task> tasks = this.getTasks(job.getInstanceId(), job.getNodeId());
        if (tasks.isEmpty()) {
            return this.createExecutor(job);
        }
        return Flux.fromIterable(tasks).flatMap(task -> task.setJob(job).then(task.reload()).thenReturn(task));
    }

    private Flux<Task> createExecutor(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).switchIfEmpty((Publisher)Mono.error(() -> new UnsupportedOperationException("unsupported executor:" + job.getExecutor()))).flatMap(worker -> worker.createTask(this.id, job)).doOnNext(task -> this.getTasks(job.getInstanceId(), job.getNodeId()).add((Task)task));
    }

    public Mono<Void> shutdown(String instanceId) {
        return this.getSchedulingTask(instanceId).flatMap(Task::shutdown).then(Mono.fromRunnable(() -> this.getTasks(instanceId).clear()));
    }

    public Flux<Task> getSchedulingTask(String instanceId) {
        return Flux.fromIterable(this.getTasks(instanceId).values()).flatMapIterable(Function.identity());
    }

    public Flux<Task> getSchedulingTasks() {
        return Flux.fromIterable(this.localTasks.values()).flatMapIterable(Map::values).flatMapIterable(Function.identity());
    }

    public Mono<Long> totalTask() {
        return this.getSchedulingTasks().count();
    }

    public Mono<Boolean> canSchedule(ScheduleJob job) {
        return this.findWorker(job.getExecutor(), job).hasElements();
    }

    protected Flux<Worker> findWorker(String executor, ScheduleJob job) {
        return this.workerSelector.select(Flux.fromIterable(this.localWorkers).filterWhen(exe -> exe.getSupportExecutors().map(list -> list.contains(executor)).defaultIfEmpty((Object)false)), job);
    }

    private List<Task> getTasks(String instanceId, String nodeId) {
        return this.getTasks(instanceId).computeIfAbsent(nodeId, ignore -> new CopyOnWriteArrayList());
    }

    private Map<String, List<Task>> getTasks(String instanceId) {
        return this.localTasks.computeIfAbsent(instanceId, ignore -> new ConcurrentHashMap());
    }

    public String getId() {
        return this.id;
    }

    public void setWorkerSelector(WorkerSelector workerSelector) {
        this.workerSelector = workerSelector;
    }
}

