package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.logging.ClientLogger;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;

/* loaded from: input_file:com/azure/core/amqp/implementation/AmqpChannelProcessor.class */
public class AmqpChannelProcessor<T> extends Mono<T> implements Processor<T, T>, CoreSubscriber<T>, Disposable {
    private static final AtomicReferenceFieldUpdater<AmqpChannelProcessor, Subscription> UPSTREAM = AtomicReferenceFieldUpdater.newUpdater(AmqpChannelProcessor.class, Subscription.class, "upstream");
    private static final String TRY_COUNT_KEY = "tryCount";
    private final ClientLogger logger;
    private final AmqpRetryPolicy retryPolicy;
    private final Function<T, Flux<AmqpEndpointState>> endpointStatesFunction;
    private final AmqpErrorContext errorContext;
    private volatile Subscription upstream;
    private volatile Throwable lastError;
    private volatile T currentChannel;
    private volatile Disposable connectionSubscription;
    private volatile Disposable retrySubscription;
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean isRequested = new AtomicBoolean();
    private final AtomicBoolean isRetryPending = new AtomicBoolean();
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final Object lock = new Object();
    private volatile ConcurrentLinkedDeque<ChannelSubscriber<T>> subscribers = new ConcurrentLinkedDeque<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/AmqpChannelProcessor$ChannelSubscriber.class */
    public static final class ChannelSubscriber<T> extends Operators.MonoSubscriber<T, T> {
        private final AmqpChannelProcessor<T> processor;
        private String subscriberId;

        private ChannelSubscriber(CoreSubscriber<? super T> coreSubscriber, AmqpChannelProcessor<T> amqpChannelProcessor) {
            super(coreSubscriber);
            this.subscriberId = null;
            this.processor = amqpChannelProcessor;
        }

        void onAdd() {
            Object orDefault = this.actual.currentContext().getOrDefault(ClientConstants.SUBSCRIBER_ID_KEY, null);
            if (orDefault != null) {
                this.subscriberId = orDefault.toString();
            } else {
                this.subscriberId = StringUtil.getRandomString("un");
            }
            ((AmqpChannelProcessor) this.processor).logger.atVerbose().addKeyValue(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId).log("Added subscriber.");
        }

        @Override // reactor.core.publisher.Operators.MonoSubscriber, org.reactivestreams.Subscription
        public void cancel() {
            ((AmqpChannelProcessor) this.processor).subscribers.remove(this);
            super.cancel();
            ((AmqpChannelProcessor) this.processor).logger.atVerbose().addKeyValue(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId).log("Canceled subscriber");
        }

        @Override // reactor.core.publisher.Operators.MonoSubscriber, org.reactivestreams.Subscriber
        public void onComplete() {
            if (isCancelled()) {
                return;
            }
            ((AmqpChannelProcessor) this.processor).subscribers.remove(this);
            this.actual.onComplete();
            ((AmqpChannelProcessor) this.processor).logger.atInfo().addKeyValue(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId).log("AMQP channel processor completed.");
        }

        @Override // reactor.core.publisher.Operators.MonoSubscriber, org.reactivestreams.Subscriber
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            ((AmqpChannelProcessor) this.processor).subscribers.remove(this);
            super.complete(t);
            ((AmqpChannelProcessor) this.processor).logger.atInfo().addKeyValue(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId).log("Next AMQP channel received.");
        }

        @Override // reactor.core.publisher.Operators.MonoSubscriber, org.reactivestreams.Subscriber
        public void onError(Throwable th) {
            if (isCancelled()) {
                Operators.onErrorDropped(th, currentContext());
                return;
            }
            ((AmqpChannelProcessor) this.processor).subscribers.remove(this);
            this.actual.onError(th);
            ((AmqpChannelProcessor) this.processor).logger.atInfo().addKeyValue(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId).log("Error in AMQP channel processor.");
        }
    }

    @Deprecated
    public AmqpChannelProcessor(String str, String str2, Function<T, Flux<AmqpEndpointState>> function, AmqpRetryPolicy amqpRetryPolicy, ClientLogger clientLogger) {
        this.endpointStatesFunction = (Function) Objects.requireNonNull(function, "'endpointStates' cannot be null.");
        this.retryPolicy = (AmqpRetryPolicy) Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        HashMap hashMap = new HashMap(1);
        hashMap.put(ClientConstants.ENTITY_PATH_KEY, Objects.requireNonNull(str2, "'entityPath' cannot be null."));
        this.logger = new ClientLogger(getClass(), hashMap);
        this.errorContext = new AmqpErrorContext(str);
    }

    public AmqpChannelProcessor(String str, Function<T, Flux<AmqpEndpointState>> function, AmqpRetryPolicy amqpRetryPolicy, Map<String, Object> map) {
        this.endpointStatesFunction = (Function) Objects.requireNonNull(function, "'endpointStates' cannot be null.");
        this.retryPolicy = (AmqpRetryPolicy) Objects.requireNonNull(amqpRetryPolicy, "'retryPolicy' cannot be null.");
        this.logger = new ClientLogger(getClass(), (Map<String, Object>) Objects.requireNonNull(map, "'loggingContext' cannot be null."));
        this.errorContext = new AmqpErrorContext(str);
    }

    @Override // org.reactivestreams.Subscriber, reactor.core.CoreSubscriber
    public void onSubscribe(Subscription subscription) {
        if (!Operators.setOnce(UPSTREAM, this, subscription)) {
            this.logger.warning("Processors can only be subscribed to once.");
        } else {
            this.isRequested.set(true);
            subscription.request(1L);
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onNext(T t) {
        T t2;
        Disposable disposable;
        this.logger.info("Setting next AMQP channel.");
        Objects.requireNonNull(t, "'amqpChannel' cannot be null.");
        synchronized (this.lock) {
            t2 = this.currentChannel;
            disposable = this.connectionSubscription;
            this.currentChannel = t;
            this.subscribers.forEach(channelSubscriber -> {
                channelSubscriber.onNext(t);
            });
            this.connectionSubscription = this.endpointStatesFunction.apply(t).subscribe(amqpEndpointState -> {
                if (amqpEndpointState == AmqpEndpointState.ACTIVE) {
                    this.retryAttempts.set(0);
                    this.logger.info("Channel is now active.");
                }
            }, th -> {
                setAndClearChannel();
                onError(th);
            }, () -> {
                if (isDisposed()) {
                    this.logger.info("Channel is disposed.");
                    return;
                }
                this.logger.info("Channel is closed. Requesting upstream.");
                setAndClearChannel();
                requestUpstream();
            });
        }
        close(t2);
        if (disposable != null) {
            disposable.dispose();
        }
        this.isRequested.set(false);
    }

    @Override // org.reactivestreams.Subscriber
    public void onError(Throwable th) {
        Duration calculateRetryDelay;
        Objects.requireNonNull(th, "'throwable' is required.");
        if (this.isRetryPending.get() && this.retryPolicy.calculateRetryDelay(th, this.retryAttempts.get()) != null) {
            this.logger.warning("Retry is already pending. Ignoring transient error.", th);
            return;
        }
        int incrementAndGet = this.retryAttempts.incrementAndGet();
        int i = incrementAndGet - 1;
        if (((th instanceof AmqpException) && ((AmqpException) th).isTransient()) || (th instanceof IllegalStateException) || (th instanceof RejectedExecutionException)) {
            calculateRetryDelay = this.retryPolicy.calculateRetryDelay(th instanceof AmqpException ? th : new AmqpException(true, "Non-AmqpException occurred upstream.", th, this.errorContext), Math.min(incrementAndGet, this.retryPolicy.getMaxRetries()));
        } else {
            calculateRetryDelay = this.retryPolicy.calculateRetryDelay(th, incrementAndGet);
        }
        if (calculateRetryDelay != null) {
            if (this.isRetryPending.getAndSet(true)) {
                this.retryAttempts.decrementAndGet();
                return;
            } else {
                this.logger.atInfo().addKeyValue("tryCount", i).addKeyValue(ClientConstants.INTERVAL_KEY, calculateRetryDelay.toMillis()).log("Transient error occurred. Retrying.", th);
                this.retrySubscription = Mono.delay(calculateRetryDelay).subscribe(l -> {
                    if (isDisposed()) {
                        this.logger.atInfo().addKeyValue("tryCount", i).log("Not requesting from upstream. Processor is disposed.");
                        return;
                    }
                    this.logger.atInfo().addKeyValue("tryCount", i).log("Requesting from upstream.");
                    requestUpstream();
                    this.isRetryPending.set(false);
                });
                return;
            }
        }
        this.logger.atWarning().addKeyValue("tryCount", i).log("Retry attempts exhausted or exception was not retriable.", th);
        this.lastError = th;
        this.isDisposed.set(true);
        dispose();
        synchronized (this.lock) {
            ConcurrentLinkedDeque<ChannelSubscriber<T>> concurrentLinkedDeque = this.subscribers;
            this.subscribers = new ConcurrentLinkedDeque<>();
            concurrentLinkedDeque.forEach(channelSubscriber -> {
                channelSubscriber.onError(th);
            });
        }
    }

    @Override // org.reactivestreams.Subscriber
    public void onComplete() {
        this.logger.info("Upstream connection publisher was completed. Terminating processor.");
        this.isDisposed.set(true);
        synchronized (this.lock) {
            ConcurrentLinkedDeque<ChannelSubscriber<T>> concurrentLinkedDeque = this.subscribers;
            this.subscribers = new ConcurrentLinkedDeque<>();
            concurrentLinkedDeque.forEach(channelSubscriber -> {
                channelSubscriber.onComplete();
            });
        }
    }

    @Override // reactor.core.publisher.Mono, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        if (isDisposed()) {
            if (this.lastError == null) {
                Operators.error(coreSubscriber, this.logger.logExceptionAsError(new IllegalStateException("Cannot subscribe. Processor is already terminated.")));
                return;
            } else {
                coreSubscriber.onSubscribe(Operators.emptySubscription());
                coreSubscriber.onError(this.lastError);
                return;
            }
        }
        ChannelSubscriber<T> channelSubscriber = new ChannelSubscriber<>(coreSubscriber, this);
        coreSubscriber.onSubscribe(channelSubscriber);
        synchronized (this.lock) {
            if (this.currentChannel != null) {
                channelSubscriber.complete(this.currentChannel);
                return;
            }
            channelSubscriber.onAdd();
            this.subscribers.add(channelSubscriber);
            if (this.isRetryPending.get()) {
                return;
            }
            requestUpstream();
        }
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        if (this.retrySubscription != null && !this.retrySubscription.isDisposed()) {
            this.retrySubscription.dispose();
        }
        onComplete();
        synchronized (this.lock) {
            setAndClearChannel();
        }
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    private void requestUpstream() {
        if (this.currentChannel != null) {
            this.logger.verbose("Connection exists, not requesting another.");
            return;
        }
        if (isDisposed()) {
            this.logger.verbose("Is already disposed.");
            return;
        }
        Subscription subscription = UPSTREAM.get(this);
        if (subscription == null) {
            this.logger.warning("There is no upstream subscription.");
        } else {
            if (this.isRequested.getAndSet(true)) {
                return;
            }
            this.logger.info("Connection not requested, yet. Requesting one.");
            subscription.request(1L);
        }
    }

    private void setAndClearChannel() {
        T t;
        synchronized (this.lock) {
            t = this.currentChannel;
            this.currentChannel = null;
        }
        close(t);
    }

    public boolean isChannelClosed() {
        boolean z;
        synchronized (this.lock) {
            z = this.currentChannel == null || isDisposed();
        }
        return z;
    }

    private void close(T t) {
        if (t instanceof AsyncCloseable) {
            ((AsyncCloseable) t).closeAsync().subscribe();
            return;
        }
        if (t instanceof AutoCloseable) {
            try {
                ((AutoCloseable) t).close();
                return;
            } catch (Exception e) {
                this.logger.warning("Error occurred closing AutoCloseable channel.", e);
                return;
            }
        }
        if (t instanceof Disposable) {
            try {
                ((Disposable) t).dispose();
            } catch (Exception e2) {
                this.logger.warning("Error occurred closing Disposable channel.", e2);
            }
        }
    }
}
