/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive;

import java.util.Collection;
import java.util.LinkedList;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;

public class SequentialSubscriber<T>
implements Flow.Subscriber<T> {
    private Flow.Subscriber<T> subscriber;
    private ReentrantLock seqLock = new ReentrantLock();
    private LinkedList<Runnable> queue = new LinkedList();
    private volatile boolean done;
    private boolean draining;
    private AtomicBoolean subscribedAlready = new AtomicBoolean(false);

    protected SequentialSubscriber(Flow.Subscriber<T> subscriber) {
        this.subscriber = subscriber;
    }

    public static <T> SequentialSubscriber<T> create(Flow.Subscriber<T> subscriber) {
        return new SequentialSubscriber<T>(subscriber);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        boolean cancel;
        Objects.requireNonNull(subscription);
        if (this.subscribedAlready.getAndSet(true)) {
            subscription.cancel();
            return;
        }
        if (!this.done) {
            try {
                this.seqLock.lock();
                if (this.done) {
                    cancel = true;
                }
                if (this.draining) {
                    this.queue.addFirst(() -> this.subscriber.onSubscribe(subscription));
                    return;
                }
                this.draining = true;
                cancel = false;
            }
            finally {
                this.seqLock.unlock();
            }
        } else {
            cancel = true;
        }
        if (cancel) {
            subscription.cancel();
        } else {
            this.subscriber.onSubscribe(subscription);
            this.drainQueue();
        }
    }

    @Override
    public void onNext(T item) {
        Objects.requireNonNull(item);
        if (this.done) {
            return;
        }
        try {
            this.seqLock.lock();
            if (this.done) {
                return;
            }
            if (this.draining) {
                this.queue.add(() -> this.submit(item));
                return;
            }
            this.draining = true;
        }
        finally {
            this.seqLock.unlock();
        }
        this.submit(item);
        this.drainQueue();
    }

    @Override
    public void onError(Throwable throwable) {
        Objects.requireNonNull(throwable);
        if (this.done) {
            return;
        }
        try {
            this.seqLock.lock();
            if (this.done) {
                return;
            }
            this.done = true;
            if (this.draining) {
                this.queue = new ReadOnlySignalQueue(this.queue, () -> this.subscriber.onError(throwable));
                return;
            }
            this.draining = true;
        }
        finally {
            this.seqLock.unlock();
        }
        this.subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        if (this.done) {
            return;
        }
        try {
            this.seqLock.lock();
            if (this.done) {
                return;
            }
            this.done = true;
            if (this.draining) {
                this.queue = new ReadOnlySignalQueue(this.queue, () -> this.subscriber.onComplete());
                return;
            }
            this.draining = true;
        }
        finally {
            this.seqLock.unlock();
        }
        this.subscriber.onComplete();
    }

    private void drainQueue() {
        while (true) {
            Runnable job;
            try {
                this.seqLock.lock();
                if (this.queue.isEmpty()) {
                    this.draining = false;
                    return;
                }
                job = this.queue.removeFirst();
            }
            finally {
                this.seqLock.unlock();
            }
            job.run();
        }
    }

    private void submit(T item) {
        try {
            this.subscriber.onNext(item);
        }
        catch (Throwable ex) {
            this.onError(ex);
        }
    }

    private class ReadOnlySignalQueue
    extends LinkedList<Runnable> {
        ReadOnlySignalQueue(Collection<Runnable> unfinishedQueue, Runnable single) {
            super.addAll(unfinishedQueue);
            super.add(single);
        }

        @Override
        public boolean add(Runnable runnable) {
            return false;
        }
    }
}

