package org.jetlinks.rule.engine.defaults;

import org.jetlinks.rule.engine.api.RuleConstants;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.Task;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/jetlinks/rule/engine/defaults/FunctionTaskExecutor.class */
public abstract class FunctionTaskExecutor extends AbstractTaskExecutor {
    private final String name;

    public FunctionTaskExecutor(String str, ExecutionContext executionContext) {
        super(executionContext);
        this.name = str;
    }

    protected abstract Publisher<RuleData> apply(RuleData ruleData);

    @Override // org.jetlinks.rule.engine.defaults.AbstractTaskExecutor
    protected Disposable doStart() {
        return this.context.getInput().accept().filter(ruleData -> {
            return this.state == Task.State.running;
        }).flatMap(ruleData2 -> {
            return this.context.getOutput().write((Publisher<RuleData>) Flux.from(apply(ruleData2)).flatMap(ruleData2 -> {
                return this.context.fireEvent(RuleConstants.Event.result, ruleData2).thenReturn(ruleData2);
            })).then(this.context.fireEvent(RuleConstants.Event.complete, ruleData2)).onErrorResume(th -> {
                return this.context.onError(th, ruleData2);
            });
        }).onErrorResume(th -> {
            return this.context.onError(th, (RuleData) null);
        }).subscribe();
    }

    @Override // org.jetlinks.rule.engine.defaults.AbstractTaskExecutor, org.jetlinks.rule.engine.api.task.TaskExecutor
    public String getName() {
        return this.name;
    }
}
