package com.azure.messaging.servicebus;

import com.azure.core.util.logging.ClientLogger;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/* loaded from: input_file:com/azure/messaging/servicebus/SynchronousMessageSubscriber.class */
class SynchronousMessageSubscriber extends BaseSubscriber<ServiceBusReceivedMessage> {
    private final long requested;
    private Disposable currentTimeoutOperation;
    private SynchronousReceiveWork currentWork;
    private boolean subscriberInitialized;
    private volatile Subscription subscription;
    private static final AtomicReferenceFieldUpdater<SynchronousMessageSubscriber, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(SynchronousMessageSubscriber.class, Subscription.class, "subscription");
    private final ClientLogger logger = new ClientLogger((Class<?>) SynchronousMessageSubscriber.class);
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicInteger wip = new AtomicInteger();
    private final Queue<SynchronousReceiveWork> workQueue = new ConcurrentLinkedQueue();
    private final Queue<ServiceBusReceivedMessage> bufferMessages = new ConcurrentLinkedQueue();
    private final AtomicLong remaining = new AtomicLong();
    private final Object currentWorkLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousMessageSubscriber(long j, SynchronousReceiveWork synchronousReceiveWork) {
        this.workQueue.add(synchronousReceiveWork);
        this.requested = ((long) synchronousReceiveWork.getNumberOfEvents()) > j ? synchronousReceiveWork.getNumberOfEvents() : j;
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnSubscribe(Subscription subscription) {
        if (!Operators.setOnce(UPSTREAM, this, subscription)) {
            this.logger.error("Already subscribed once.");
            return;
        }
        this.subscription = subscription;
        this.subscriberInitialized = true;
        drain();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // reactor.core.publisher.BaseSubscriber
    public void hookOnNext(ServiceBusReceivedMessage serviceBusReceivedMessage) {
        this.bufferMessages.add(serviceBusReceivedMessage);
        drain();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void queueWork(SynchronousReceiveWork synchronousReceiveWork) {
        this.logger.info("[{}] Pending: {}, Scheduling receive timeout task '{}'.", Long.valueOf(synchronousReceiveWork.getId()), Integer.valueOf(synchronousReceiveWork.getNumberOfEvents()), synchronousReceiveWork.getTimeout());
        this.workQueue.add(synchronousReceiveWork);
        if (this.subscriberInitialized) {
            drain();
        }
    }

    private void drain() {
        if (this.wip.compareAndSet(0, 1)) {
            try {
                drainQueue();
                int decrementAndGet = this.wip.decrementAndGet();
                if (decrementAndGet != 0) {
                    this.logger.warning("There should be 0, but was: {}", Integer.valueOf(decrementAndGet));
                }
            } catch (Throwable th) {
                int decrementAndGet2 = this.wip.decrementAndGet();
                if (decrementAndGet2 != 0) {
                    this.logger.warning("There should be 0, but was: {}", Integer.valueOf(decrementAndGet2));
                }
                throw th;
            }
        }
    }

    private void drainQueue() {
        if (isTerminated()) {
            return;
        }
        synchronized (this.currentWorkLock) {
            if (this.currentWork != null && this.currentWork.isTerminal()) {
                this.workQueue.remove(this.currentWork);
                if (this.currentTimeoutOperation != null && !this.currentTimeoutOperation.isDisposed()) {
                    this.currentTimeoutOperation.dispose();
                }
                this.currentTimeoutOperation = null;
            }
            while (true) {
                SynchronousReceiveWork peek = this.workQueue.peek();
                this.currentWork = peek;
                if (peek == null || (this.currentWork.isProcessingStarted() && this.bufferMessages.size() <= 0)) {
                    break;
                }
                if (this.currentWork.isTerminal()) {
                    this.workQueue.remove(this.currentWork);
                    if (this.currentTimeoutOperation != null && !this.currentTimeoutOperation.isDisposed()) {
                        this.currentTimeoutOperation.dispose();
                    }
                } else {
                    if (!this.currentWork.isProcessingStarted()) {
                        this.currentTimeoutOperation = getTimeoutOperation(this.currentWork);
                        this.currentWork.startedProcessing();
                        long numberOfEvents = this.currentWork.getNumberOfEvents() - this.remaining.get();
                        this.remaining.addAndGet(numberOfEvents);
                        this.subscription.request(numberOfEvents);
                    }
                    while (this.bufferMessages.size() > 0 && !this.currentWork.isTerminal()) {
                        this.currentWork.next(this.bufferMessages.poll());
                        this.remaining.decrementAndGet();
                    }
                    if (this.currentWork.isTerminal()) {
                        if (this.currentWork.getError() == null) {
                            this.currentWork.complete();
                        }
                        this.workQueue.remove(this.currentWork);
                        if (this.currentTimeoutOperation != null && !this.currentTimeoutOperation.isDisposed()) {
                            this.currentTimeoutOperation.dispose();
                        }
                        this.logger.verbose("The work [{}] is complete.", Long.valueOf(this.currentWork.getId()));
                    }
                }
            }
        }
    }

    private Disposable getTimeoutOperation(SynchronousReceiveWork synchronousReceiveWork) {
        return Mono.delay(synchronousReceiveWork.getTimeout()).thenReturn(synchronousReceiveWork).subscribe((Consumer<? super V>) synchronousReceiveWork2 -> {
            synchronized (this.currentWorkLock) {
                if (this.currentWork == synchronousReceiveWork) {
                    synchronousReceiveWork.timeout();
                }
            }
        });
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnError(Throwable th) {
        this.logger.error("[{}] Errors occurred upstream", Long.valueOf(this.currentWork.getId()), th);
        synchronized (this.currentWorkLock) {
            this.currentWork.error(th);
        }
        dispose();
    }

    @Override // reactor.core.publisher.BaseSubscriber
    protected void hookOnCancel() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        synchronized (this.currentWorkLock) {
            if (this.currentWork != null) {
                this.currentWork.complete();
            }
            if (this.currentTimeoutOperation != null && !this.currentTimeoutOperation.isDisposed()) {
                this.currentTimeoutOperation.dispose();
            }
            this.currentTimeoutOperation = null;
        }
        this.subscription.cancel();
    }

    private boolean isTerminated() {
        return this.isDisposed.get();
    }

    int getWorkQueueSize() {
        return this.workQueue.size();
    }

    long getRequested() {
        return this.requested;
    }

    boolean isSubscriberInitialized() {
        return this.subscriberInitialized;
    }
}
