/*
 * Decompiled with CFR 0.152.
 */
package reactor.function.support;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.function.Consumer;
import reactor.function.Supplier;
import reactor.support.NamedDaemonThreadFactory;

public class Poller<T> {
    private static final Logger LOG = LoggerFactory.getLogger(Poller.class);
    private final ExecutorService threadPool = Executors.newSingleThreadExecutor(new NamedDaemonThreadFactory("pipe"));
    private final Runnable reader = new Runnable(){

        @Override
        public void run() {
            while (Poller.this.active) {
                Object obj = Poller.this.supplier.get();
                if (null != obj) {
                    Poller.this.consumer.accept(obj);
                    continue;
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    };
    private final Supplier<T> supplier;
    private final Consumer<T> consumer;
    private volatile boolean active;
    private volatile Future<?> readerFuture;

    public Poller(Supplier<T> supplier, Consumer<T> consumer) {
        this(supplier, consumer, false);
    }

    public Poller(Supplier<T> supplier, Consumer<T> consumer, boolean paused) {
        this.supplier = supplier;
        this.consumer = consumer;
        if (!paused) {
            this.resume();
        }
    }

    public synchronized Poller<T> resume() {
        if (this.threadPool.isShutdown() || this.threadPool.isTerminated()) {
            throw new IllegalStateException("Poller has been shutdown.");
        }
        if (null != this.readerFuture) {
            return this;
        }
        this.active = true;
        this.readerFuture = this.threadPool.submit(this.reader);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Started pipe from " + this.supplier + " to " + this.consumer);
        }
        return this;
    }

    public synchronized Poller<T> pause() {
        this.active = false;
        if (null != this.readerFuture) {
            this.readerFuture.cancel(true);
            this.readerFuture = null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Stopped pipe from " + this.supplier + " to " + this.consumer);
        }
        return this;
    }

    public void shutdown() {
        this.threadPool.shutdown();
    }
}

