/*
 * Decompiled with CFR 0.152.
 */
package reactor.core;

import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import reactor.core.Observable;
import reactor.event.Event;
import reactor.function.Consumer;
import reactor.function.Predicate;
import reactor.queue.BlockingQueueFactory;
import reactor.util.Assert;

public class EventBatcher<T>
implements Consumer<Event<T>> {
    private final AtomicLong count = new AtomicLong();
    private final AtomicLong flushCount = new AtomicLong();
    private final Observable observable;
    private final Object key;
    private final Queue<Event<T>> queue;
    private final Predicate<Queue<Event<T>>> queueWhile;
    private final Predicate<Queue<Event<T>>> flushWhen;

    public EventBatcher(@Nonnull Observable observable, @Nonnull Object key, @Nullable Queue<Event<T>> queue, @Nullable Predicate<Queue<Event<T>>> queueWhile, @Nullable Predicate<Queue<Event<T>>> flushWhen) {
        Assert.notNull(observable, "Reactor cannot be null.");
        Assert.notNull(key, "Event key cannot be null.");
        this.observable = observable;
        this.key = key;
        this.queue = null == queue ? BlockingQueueFactory.createQueue() : queue;
        this.queueWhile = queueWhile;
        this.flushWhen = flushWhen;
    }

    public void flush() {
        Event<T> ev;
        this.flushCount.set(this.count.get());
        while (this.flushCount.getAndDecrement() > 0L && null != (ev = this.queue.poll())) {
            this.observable.notify(this.key, ev);
        }
    }

    @Override
    public final void accept(Event<T> ev) {
        if (null == this.queueWhile || this.queueWhile.test(this.queue)) {
            this.queue.add(ev);
            this.count.incrementAndGet();
        }
        if (null != this.flushWhen && this.flushWhen.test(this.queue)) {
            this.flush();
        }
    }
}

