/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webserver.netty;

import io.helidon.common.http.DataChunk;
import io.helidon.common.reactive.Flow;
import io.helidon.webserver.netty.ByteBufRequestChunk;
import io.helidon.webserver.netty.ReferenceHoldingQueue;
import io.helidon.webserver.netty.UnboundedSemaphore;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

class OriginThreadPublisher
implements Flow.Publisher<DataChunk> {
    private static final Logger LOGGER = Logger.getLogger(OriginThreadPublisher.class.getName());
    private final UnboundedSemaphore semaphore;
    private final AtomicBoolean hasSingleSubscriber = new AtomicBoolean(false);
    private final Lock reentrantLock = new ReentrantLock();
    private volatile Flow.Subscriber<? super DataChunk> singleSubscriber;
    private volatile boolean completed;
    private volatile Throwable t;
    private final BlockingQueue<ByteBufRequestChunk> queue = new ArrayBlockingQueue<ByteBufRequestChunk>(256);
    private final ReferenceHoldingQueue<ByteBufRequestChunk> referenceQueue;
    private AtomicLong nextCount = new AtomicLong();
    private volatile long reqCount = 0L;

    OriginThreadPublisher(UnboundedSemaphore semaphore, ReferenceHoldingQueue<ByteBufRequestChunk> referenceQueue) {
        this.semaphore = semaphore;
        this.referenceQueue = referenceQueue;
    }

    OriginThreadPublisher(ReferenceHoldingQueue<ByteBufRequestChunk> referenceQueue) {
        this(new UnboundedSemaphore(), referenceQueue);
    }

    public void subscribe(Flow.Subscriber<? super DataChunk> originalSubscriber) {
        if (!this.hasSingleSubscriber.compareAndSet(false, true)) {
            originalSubscriber.onError((Throwable)new IllegalStateException("Only single subscriber is allowed!"));
            return;
        }
        this.singleSubscriber = originalSubscriber;
        this.reentrantLock.lock();
        try {
            originalSubscriber.onSubscribe(new Flow.Subscription(){
                private boolean nexting;

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void request(long n) {
                    if (n <= 0L) {
                        OriginThreadPublisher.this.error(new IllegalArgumentException("[3.9] Illegal value requested: " + n));
                    }
                    try {
                        OriginThreadPublisher.this.reentrantLock.lock();
                        OriginThreadPublisher.this.reqCount += n;
                        long release = n;
                        if (this.nexting) {
                            return;
                        }
                        while (OriginThreadPublisher.this.singleSubscriber != null && !OriginThreadPublisher.this.queue.isEmpty() && OriginThreadPublisher.this.reqCount > OriginThreadPublisher.this.nextCount.get()) {
                            OriginThreadPublisher.this.nextCount.incrementAndGet();
                            try {
                                this.nexting = true;
                                --release;
                                ByteBufRequestChunk item = (ByteBufRequestChunk)OriginThreadPublisher.this.queue.remove();
                                LOGGER.finest(() -> "Publishing request chunk: " + item.id());
                                OriginThreadPublisher.this.singleSubscriber.onNext((Object)item);
                            }
                            finally {
                                this.nexting = false;
                            }
                        }
                        if (OriginThreadPublisher.this.singleSubscriber == null) {
                            return;
                        }
                        if (OriginThreadPublisher.this.t != null) {
                            LOGGER.finest("Completing with an error from request.");
                            OriginThreadPublisher.this.singleSubscriber.onError(OriginThreadPublisher.this.t);
                        } else if (OriginThreadPublisher.this.completed && OriginThreadPublisher.this.queue.isEmpty()) {
                            LOGGER.finest("Completing from request.");
                            OriginThreadPublisher.this.singleSubscriber.onComplete();
                        } else if (OriginThreadPublisher.this.queue.isEmpty()) {
                            long released = n == Long.MAX_VALUE ? Long.MAX_VALUE : release;
                            long result = OriginThreadPublisher.this.semaphore.release(released);
                            LOGGER.finest(() -> "Semaphore released: " + result);
                            OriginThreadPublisher.this.hookOnRequested(released, result);
                        }
                    }
                    finally {
                        OriginThreadPublisher.this.reentrantLock.unlock();
                    }
                }

                public void cancel() {
                    OriginThreadPublisher.this.hookOnCancel();
                    OriginThreadPublisher.this.singleSubscriber = null;
                }
            });
        }
        finally {
            this.reentrantLock.unlock();
        }
    }

    void hookOnRequested(long n, long result) {
    }

    void hookOnCancel() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void submit(ByteBuf data) {
        try {
            this.reentrantLock.lock();
            ByteBufRequestChunk chunk = new ByteBufRequestChunk(data, this.referenceQueue);
            if (!this.queue.offer(chunk)) {
                LOGGER.severe("Unable to add an element to the publisher cache.");
                this.error(new IllegalStateException("Unable to add an element to the publisher cache."));
                return;
            }
            if (this.nextCount.get() < this.reqCount) {
                this.nextCount.incrementAndGet();
                ByteBufRequestChunk item = (ByteBufRequestChunk)this.queue.poll();
                LOGGER.finest(() -> "Publishing request chunk: " + (null == item ? "null" : Long.valueOf(item.id())));
                this.singleSubscriber.onNext((Object)item);
            } else {
                LOGGER.finest(() -> "Not publishing due to low request count: " + this.nextCount + " <= " + this.reqCount);
            }
        }
        catch (RuntimeException e) {
            if (this.singleSubscriber == null) {
                this.t = e;
            } else {
                this.error(new IllegalStateException("An error occurred when submitting data.", e));
            }
        }
        finally {
            this.reentrantLock.unlock();
            this.referenceQueue.release();
        }
    }

    void error(Throwable throwable) {
        try {
            this.reentrantLock.lock();
            if (this.singleSubscriber != null && this.queue.isEmpty()) {
                this.singleSubscriber.onError(throwable);
                this.singleSubscriber = null;
            } else {
                this.t = throwable;
            }
        }
        catch (RuntimeException e) {
            throw new IllegalStateException("On error threw an exception!", e);
        }
        finally {
            this.reentrantLock.unlock();
            this.referenceQueue.release();
        }
    }

    void complete() {
        try {
            this.reentrantLock.lock();
            this.completed = true;
            if (this.singleSubscriber != null && this.queue.isEmpty()) {
                LOGGER.finest("Completing by the producing thread.");
                this.singleSubscriber.onComplete();
                this.singleSubscriber = null;
            } else {
                LOGGER.finest("Not completing by the producing thread.");
            }
        }
        finally {
            this.reentrantLock.unlock();
            this.referenceQueue.release();
        }
    }

    long tryAcquire() {
        return this.semaphore.tryAcquire();
    }

    boolean isCompleted() {
        return this.completed;
    }
}

