/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.common.reactive.valve;

import io.helidon.common.reactive.Flow;
import io.helidon.common.reactive.valve.Pausable;
import io.helidon.common.reactive.valve.Valve;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class ValvePublisher<T>
implements Flow.Publisher<T> {
    private final Valve<T> valve;
    private final ReentrantReadWriteLock.WriteLock pausableFeederNullLock = new ReentrantReadWriteLock().writeLock();
    private volatile Flow.Subscriber<? super T> singleSubscriber;
    private volatile PausableFeeder pausableFeeder;

    ValvePublisher(Valve<T> valve) {
        this.valve = valve;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(final Flow.Subscriber<? super T> subscriber) {
        ValvePublisher valvePublisher = this;
        synchronized (valvePublisher) {
            if (this.singleSubscriber != null) {
                subscriber.onError(new IllegalStateException("Multiple subscribers aren't allowed!"));
                return;
            }
            this.singleSubscriber = subscriber;
        }
        this.singleSubscriber.onSubscribe(new Flow.Subscription(){

            @Override
            public void request(long n) {
                if (n <= 0L) {
                    subscriber.onError(new IllegalArgumentException("Requested illegal item count: 0"));
                    return;
                }
                if (ValvePublisher.this.pausableFeeder != null) {
                    ValvePublisher.this.pausableFeeder.release(n);
                } else {
                    try {
                        ValvePublisher.this.pausableFeederNullLock.lock();
                        if (ValvePublisher.this.pausableFeeder == null) {
                            ValvePublisher.this.pausableFeeder = new PausableFeeder(n - 1L, ValvePublisher.this.valve);
                            ValvePublisher.this.handleValve();
                        } else {
                            ValvePublisher.this.pausableFeeder.release(n);
                        }
                    }
                    finally {
                        ValvePublisher.this.pausableFeederNullLock.unlock();
                    }
                }
            }

            @Override
            public void cancel() {
                ValvePublisher.this.valve.pause();
            }
        });
    }

    private void handleValve() {
        this.valve.handle(data -> {
            this.singleSubscriber.onNext(data);
            this.pausableFeeder.acquire();
        }, throwable -> this.singleSubscriber.onError(new IllegalStateException("Valve to Publisher in an error.", (Throwable)throwable)), this.singleSubscriber::onComplete);
    }

    private static class PausableFeeder {
        private final Pausable pausable;
        private final ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
        private volatile long count;

        PausableFeeder(long count, Pausable pausable) {
            this.count = count;
            this.pausable = pausable;
        }

        private void acquire() {
            try {
                this.lock.lock();
                long l = this.count == Long.MAX_VALUE ? this.count : (this.count = this.count == 0L ? 0L : this.count - 1L);
                if (this.count == 0L) {
                    this.pausable.pause();
                }
            }
            finally {
                this.lock.unlock();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void release(long n) {
            try {
                this.lock.lock();
                long r = this.count + n;
                this.count = r == Long.MAX_VALUE || ((this.count ^ r) & (n ^ r)) < 0L ? Long.MAX_VALUE : this.count + n;
                this.pausable.resume();
            }
            finally {
                this.lock.unlock();
            }
        }
    }
}

