/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.composable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Observable;
import reactor.core.composable.Composable;
import reactor.core.composable.Deferred;
import reactor.event.Event;
import reactor.event.dispatch.Dispatcher;
import reactor.event.dispatch.SynchronousDispatcher;
import reactor.event.selector.Selector;
import reactor.event.selector.Selectors;
import reactor.event.support.EventConsumer;
import reactor.function.Consumer;
import reactor.function.Function;
import reactor.function.Functions;
import reactor.function.Predicate;
import reactor.function.Supplier;
import reactor.function.support.Tap;
import reactor.tuple.Tuple;
import reactor.tuple.Tuple2;
import reactor.util.Assert;

public class Stream<T>
extends Composable<T> {
    private final Tuple2<Selector, Object> first = Selectors.$();
    private final Tuple2<Selector, Object> last = Selectors.$();
    private final int batchSize;
    private final Iterable<T> values;

    public Stream(@Nonnull Dispatcher dispatcher, int batchSize, @Nullable Iterable<T> values, @Nullable Composable<?> parent) {
        super(dispatcher, parent);
        this.batchSize = batchSize;
        this.values = values;
        this.getObservable().on((Selector)this.getFlush().getT1(), new Consumer<Event<Void>>(){

            @Override
            public void accept(Event<Void> ev) {
                if (null == Stream.this.values) {
                    return;
                }
                for (Object val : Stream.this.values) {
                    Stream.this.notifyValue(val);
                }
            }
        });
    }

    @Override
    public Stream<T> consume(@Nonnull Consumer<T> consumer) {
        return (Stream)super.consume(consumer);
    }

    @Override
    public Stream<T> consume(@Nonnull Composable<T> consumer) {
        return (Stream)super.consume(consumer);
    }

    @Override
    public Stream<T> consume(@Nonnull Object key, @Nonnull Observable observable) {
        return (Stream)super.consume(key, observable);
    }

    @Override
    public Stream<T> flush() {
        return (Stream)super.flush();
    }

    @Override
    public <E extends Throwable> Stream<T> when(@Nonnull Class<E> exceptionType, @Nonnull Consumer<E> onError) {
        return (Stream)super.when(exceptionType, onError);
    }

    @Override
    public <V> Stream<V> map(@Nonnull Function<T, V> fn) {
        return (Stream)super.map(fn);
    }

    @Override
    public Stream<T> filter(@Nonnull Predicate<T> p) {
        return (Stream)super.filter(p);
    }

    @Override
    public Stream<T> filter(@Nonnull Predicate<T> p, Composable<T> composable) {
        return (Stream)super.filter(p, composable);
    }

    public Stream<T> first() {
        Deferred<T, Stream<T>> d = this.createDeferredChildStream();
        this.getObservable().on((Selector)this.first.getT1(), new EventConsumer(d));
        return d.compose();
    }

    public Stream<T> last() {
        Deferred<T, Stream<T>> d = this.createDeferredChildStream();
        this.getObservable().on((Selector)this.last.getT1(), new EventConsumer(d));
        return d.compose();
    }

    public Stream<T> batch(int batchSize) {
        Deferred d = this.createDeferred(batchSize);
        this.consume((Consumer)d);
        return (Stream)d.compose();
    }

    public boolean isBatch() {
        return this.batchSize > 0;
    }

    public Tap<T> tap() {
        Tap tap = new Tap();
        this.consume((Consumer)tap);
        return tap;
    }

    public Stream<List<T>> collect() {
        Assert.state(this.batchSize > 0, "Cannot collect() an unbounded Stream. Try extracting a batch first.");
        final Deferred d = this.createDeferred(this.batchSize);
        final ArrayList values = new ArrayList();
        this.consumeEvent(new Consumer<Event<T>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(Event<T> value) {
                List list = values;
                synchronized (list) {
                    values.add(value.getData());
                    if (values.size() % Stream.this.batchSize != 0) {
                        return;
                    }
                    d.acceptEvent(value.copy(new ArrayList(values)));
                    values.clear();
                }
            }
        });
        this.getObservable().on((Selector)this.getFlush().getT1(), new Consumer<Event<Void>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void accept(Event<Void> ev) {
                List list = values;
                synchronized (list) {
                    if (values.isEmpty()) {
                        return;
                    }
                    d.accept(new ArrayList(values));
                    values.clear();
                }
            }
        });
        return (Stream)d.compose();
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> fn, A initial) {
        return this.reduce(fn, Functions.supplier(initial));
    }

    public <A> Stream<A> reduce(final @Nonnull Function<Tuple2<T, A>, A> fn, final @Nullable Supplier<A> accumulators) {
        final Deferred d = this.createDeferred();
        this.consumeEvent(new Consumer<Event<T>>(){
            private final AtomicLong count = new AtomicLong(0L);
            private A acc;

            @Override
            public void accept(Event<T> value) {
                if (null == this.acc) {
                    this.acc = null != accumulators ? accumulators.get() : null;
                }
                this.acc = fn.apply(Tuple.of(value.getData(), this.acc));
                if (Stream.this.isBatch() && this.count.incrementAndGet() % (long)Stream.this.batchSize == 0L) {
                    d.acceptEvent(value.copy(this.acc));
                } else if (!Stream.this.isBatch()) {
                    d.acceptEvent(value.copy(this.acc));
                }
            }
        });
        return (Stream)d.compose();
    }

    public <A> Stream<A> reduce(@Nonnull Function<Tuple2<T, A>, A> fn) {
        return this.reduce(fn, (Supplier)null);
    }

    @Override
    protected <V, C extends Composable<V>> Deferred<V, C> createDeferred() {
        return this.createDeferred(this.batchSize);
    }

    protected <V, C extends Composable<V>> Deferred<V, C> createDeferred(int batchSize) {
        return this.createDeferredChildStream(batchSize);
    }

    private Deferred<T, Stream<T>> createDeferredChildStream() {
        return this.createDeferredChildStream(-1);
    }

    private Deferred<T, Stream<T>> createDeferredChildStream(int batchSize) {
        return new Deferred(new Stream<T>(new SynchronousDispatcher(), batchSize, null, this));
    }

    @Override
    protected void errorAccepted(Throwable error) {
    }

    @Override
    protected void valueAccepted(T value) {
        if (!this.isBatch()) {
            return;
        }
        long accepted = (this.getAcceptCount() + 1L) % (long)this.batchSize;
        if (accepted == 1L) {
            this.getObservable().notify(this.first.getT2(), Event.wrap(value));
        } else if (accepted == 0L) {
            this.getObservable().notify(this.last.getT2(), Event.wrap(value));
        }
    }

    public String toString() {
        return "Stream{acceptCount=" + this.getAcceptCount() + ", errorCount=" + this.getErrorCount() + ", batchSize=" + this.batchSize + ", values=" + this.values + '}';
    }
}

