/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.BiConsumerChain;
import io.helidon.common.reactive.ConsumerChain;
import io.helidon.common.reactive.MultiError;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

public class BufferedEmittingPublisher<T>
implements Flow.Publisher<T> {
    private final ConcurrentLinkedQueue<T> buffer = new ConcurrentLinkedQueue();
    private volatile Throwable error;
    private BiConsumer<Long, Long> requestCallback = null;
    private Consumer<? super T> onEmitCallback = null;
    private Consumer<? super T> onCleanup = null;
    private Consumer<? super Throwable> onAbort = null;
    private volatile Flow.Subscriber<? super T> subscriber;
    private final AtomicInteger state = new AtomicInteger();
    private final AtomicInteger contenders = new AtomicInteger(1);
    private final AtomicLong requested = new AtomicLong();
    private volatile boolean ignorePending;
    private long emitted;
    private boolean cancelled;

    protected BufferedEmittingPublisher() {
    }

    public static <T> BufferedEmittingPublisher<T> create() {
        return new BufferedEmittingPublisher<T>();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super T> sub) {
        if (this.stateChange(1)) {
            MultiError.create(new IllegalStateException("Only single subscriber is allowed!")).subscribe(sub);
            return;
        }
        sub.onSubscribe(new Flow.Subscription(){

            @Override
            public void request(long n) {
                long curr;
                if (n < 1L) {
                    BufferedEmittingPublisher.this.abort(new IllegalArgumentException("Expected request() with a positive increment"));
                    return;
                }
                while ((curr = BufferedEmittingPublisher.this.requested.get()) != Long.MAX_VALUE && !BufferedEmittingPublisher.this.requested.compareAndSet(curr, Long.MAX_VALUE - curr > n ? curr + n : Long.MAX_VALUE)) {
                }
                if (BufferedEmittingPublisher.this.requestCallback != null) {
                    BufferedEmittingPublisher.this.requestCallback.accept(n, curr);
                }
                BufferedEmittingPublisher.this.maybeDrain();
            }

            @Override
            public void cancel() {
                BufferedEmittingPublisher.this.cancelled = true;
                BufferedEmittingPublisher.this.ignorePending = true;
                BufferedEmittingPublisher.this.maybeDrain();
                BufferedEmittingPublisher.this.abort(null);
            }
        });
        this.subscriber = sub;
        this.drain();
    }

    public void onRequest(BiConsumer<Long, Long> requestCallback) {
        this.requestCallback = BiConsumerChain.combine(this.requestCallback, requestCallback);
    }

    public void onEmit(Consumer<T> onEmitCallback) {
        this.onEmitCallback = ConsumerChain.combine(this.onEmitCallback, onEmitCallback);
    }

    public void onCleanup(Consumer<? super T> onCleanup) {
        this.onCleanup = ConsumerChain.combine(this.onCleanup, onCleanup);
    }

    public void onAbort(Consumer<? super Throwable> onAbort) {
        this.onAbort = ConsumerChain.combine(this.onAbort, onAbort);
    }

    private void abort(Throwable th) {
        if (th != null) {
            this.fail(th);
        }
        if (this.onAbort != null) {
            this.onAbort.accept(th);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void emit(T item) {
        boolean locked = false;
        int s = this.state.get();
        if (s == 1) {
            boolean bl = locked = this.contenders.get() == 0 && this.contenders.compareAndSet(0, 1);
        }
        if (locked && !this.ignorePending && this.requested.get() > this.emitted && this.buffer.isEmpty()) {
            try {
                this.subscriber.onNext(item);
                if (this.onEmitCallback != null) {
                    this.onEmitCallback.accept(item);
                }
                ++this.emitted;
            }
            catch (RuntimeException re) {
                this.abort(re);
            }
            finally {
                this.drain();
            }
            return;
        }
        this.buffer.add(item);
        if (locked) {
            this.drain();
        } else {
            this.maybeDrain();
        }
    }

    public void fail(Throwable throwable) {
        this.error = throwable;
        this.completeNow();
    }

    public void complete() {
        if (this.cancelled || this.stateChange(2)) {
            this.maybeDrain();
        }
    }

    private boolean stateChange(int s) {
        int curr;
        while (((curr = this.state.get()) & s) != s && !this.state.compareAndSet(curr, curr + s)) {
        }
        return (curr & 1) > 0;
    }

    public void completeNow() {
        this.ignorePending = true;
        this.complete();
    }

    public void clearBuffer(Consumer<T> consumer) {
        this.onCleanup(consumer);
        this.completeNow();
    }

    public boolean isUnbounded() {
        return this.requested.get() == Long.MAX_VALUE;
    }

    public boolean hasRequests() {
        return this.requested.get() > this.emitted;
    }

    public boolean isCompleted() {
        return this.state.get() > 1 && this.buffer.isEmpty();
    }

    public boolean isCancelled() {
        return this.ignorePending && this.cancelled && !this.isCompleted();
    }

    public int bufferSize() {
        return this.buffer.size();
    }

    protected void cleanup() {
        if (this.onCleanup == null) {
            this.buffer.clear();
        } else {
            while (!this.buffer.isEmpty()) {
                this.onCleanup.accept(this.buffer.poll());
            }
        }
    }

    private void maybeDrain() {
        if (this.contenders.getAndIncrement() == 0) {
            this.drain();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drain() {
        IllegalStateException ise = null;
        int cont = 1;
        while (cont > 0) {
            boolean terminateNow = this.ignorePending;
            try {
                while (!terminateNow && this.requested.get() > this.emitted && !this.buffer.isEmpty()) {
                    T item = this.buffer.poll();
                    this.subscriber.onNext(item);
                    if (this.onEmitCallback != null) {
                        this.onEmitCallback.accept(item);
                    }
                    ++this.emitted;
                    terminateNow = this.ignorePending;
                }
            }
            catch (RuntimeException re) {
                this.abort(re);
            }
            if (terminateNow) {
                this.cleanup();
            }
            if (terminateNow || this.isCompleted()) {
                try {
                    if (!this.cancelled) {
                        this.cancelled = true;
                        this.ignorePending = true;
                        if (this.error != null) {
                            this.subscriber.onError(this.error);
                        } else {
                            this.subscriber.onComplete();
                        }
                    }
                }
                catch (Throwable th) {
                    ise = new IllegalStateException(th);
                }
                finally {
                    this.error = null;
                    this.subscriber = null;
                    this.requestCallback = null;
                    this.onEmitCallback = null;
                }
            }
            cont = this.contenders.addAndGet(-cont);
        }
        if (ise != null) {
            throw ise;
        }
    }
}

