/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import io.helidon.common.reactive.RequestedCounter;
import io.helidon.common.reactive.SingleSubscriberHolder;
import io.helidon.common.reactive.StreamValidationUtils;
import io.helidon.common.reactive.SubscriberReference;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

abstract class BaseProcessor<T, U>
implements Flow.Processor<T, U>,
Flow.Subscription {
    private Flow.Subscription subscription;
    private final SingleSubscriberHolder<U> subscriber;
    private final RequestedCounter requested;
    private final AtomicBoolean ready;
    private final AtomicBoolean subscribed;
    private SubscriberReference<? super U> referencedSubscriber;
    private ReentrantLock publisherSequentialLock = new ReentrantLock();
    private volatile boolean done;
    private Throwable error;

    BaseProcessor() {
        this.requested = new RequestedCounter();
        this.ready = new AtomicBoolean();
        this.subscribed = new AtomicBoolean();
        this.subscriber = new SingleSubscriberHolder();
    }

    @Override
    public void request(long n) {
        StreamValidationUtils.checkRequestParam(n, this::failAndCancel);
        StreamValidationUtils.checkRecursionDepth(5, (actDepth, t) -> this.failAndCancel((Throwable)t));
        this.requested.increment(n, this::failAndCancel);
        this.tryRequest(this.subscription);
        if (this.done) {
            this.tryComplete();
        }
    }

    @Override
    public void cancel() {
        this.subscriber.cancel();
        try {
            this.hookOnCancel(this.subscription);
        }
        catch (Throwable ex) {
            this.failAndCancel(ex);
        }
    }

    @Override
    public void onSubscribe(Flow.Subscription s) {
        try {
            this.publisherSequentialLock.lock();
            Objects.requireNonNull(s);
            if (this.subscription == null) {
                this.subscription = s;
                this.tryRequest(s);
            } else {
                s.cancel();
            }
        }
        finally {
            this.publisherSequentialLock.unlock();
        }
    }

    @Override
    public void onNext(T item) {
        try {
            this.publisherSequentialLock.lock();
            if (this.done) {
                throw new IllegalStateException("Subscriber is closed!");
            }
            Objects.requireNonNull(item);
            try {
                this.hookOnNext(item);
            }
            catch (Throwable ex) {
                this.failAndCancel(ex);
            }
        }
        finally {
            this.publisherSequentialLock.unlock();
        }
    }

    protected void fail(Throwable ex) {
        Objects.requireNonNull(ex);
        this.done = true;
        if (this.error == null) {
            this.error = ex;
        }
        this.tryComplete();
    }

    protected void failAndCancel(Throwable ex) {
        this.getSubscription().ifPresent(Flow.Subscription::cancel);
        this.fail(ex);
    }

    @Override
    public void onError(Throwable ex) {
        this.fail(ex);
    }

    @Override
    public void onComplete() {
        this.done = true;
        this.tryComplete();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super U> s) {
        this.referencedSubscriber = SubscriberReference.create(s);
        try {
            this.publisherSequentialLock.lock();
            if (this.subscriber.register(s)) {
                this.ready.set(true);
                s.onSubscribe(this);
                if (this.done) {
                    this.tryComplete();
                }
            }
        }
        finally {
            this.publisherSequentialLock.unlock();
        }
    }

    protected Optional<Flow.Subscription> getSubscription() {
        return Optional.ofNullable(this.subscription);
    }

    protected SingleSubscriberHolder<U> getSubscriber() {
        return this.subscriber;
    }

    protected RequestedCounter getRequestedCounter() {
        return this.requested;
    }

    protected void submit(U item) {
        if (this.requested.tryDecrement()) {
            try {
                this.subscriber.get().onNext(item);
            }
            catch (InterruptedException ex) {
                Thread.currentThread().interrupt();
                this.failAndCancel(ex);
            }
            catch (Throwable ex) {
                this.failAndCancel(ex);
            }
        } else {
            this.notEnoughRequest(item);
        }
    }

    protected void notEnoughRequest(U item) {
        this.onError(new IllegalStateException("Not enough request to submit item"));
    }

    protected void hookOnNext(T item) {
    }

    protected void hookOnError(Throwable error) {
    }

    protected void hookOnComplete() {
    }

    protected void hookOnCancel(Flow.Subscription subscription) {
        Optional.ofNullable(subscription).ifPresent(Flow.Subscription::cancel);
        this.referencedSubscriber.releaseReference();
    }

    protected final void doSubscribe(Flow.Publisher<U> delegate) {
        if (this.subscribed.compareAndSet(false, true)) {
            delegate.subscribe(new Flow.Subscriber<U>(){

                @Override
                public void onSubscribe(Flow.Subscription subscription) {
                    BaseProcessor.this.tryRequest(subscription);
                }

                @Override
                public void onNext(U item) {
                    BaseProcessor.this.submit(item);
                }

                @Override
                public void onError(Throwable ex) {
                    BaseProcessor.this.failAndCancel(ex);
                }

                @Override
                public void onComplete() {
                    BaseProcessor.this.onComplete();
                }
            });
        }
    }

    private void completeOnError(Flow.Subscriber<? super U> sub, Throwable ex) {
        this.hookOnError(ex);
        sub.onError(ex);
    }

    protected void tryComplete() {
        if (this.ready.get() && !this.subscriber.isClosed()) {
            if (this.error != null) {
                this.subscriber.close(sub -> this.completeOnError((Flow.Subscriber<? super U>)sub, this.error));
            } else {
                try {
                    this.hookOnComplete();
                }
                catch (Throwable ex) {
                    this.subscriber.close(sub -> this.completeOnError((Flow.Subscriber<? super U>)sub, ex));
                    return;
                }
                this.subscriber.close(Flow.Subscriber::onComplete);
            }
        }
    }

    protected void tryRequest(Flow.Subscription subscription) {
        long n;
        if (subscription != null && !this.subscriber.isClosed() && (n = this.requested.get()) > 0L) {
            subscription.request(n);
        }
    }
}

