package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.util.logging.ClientLogger;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/servicebus/FluxAutoComplete.class */
final class FluxAutoComplete extends FluxOperator<ServiceBusMessageContext, ServiceBusMessageContext> {
    private final Semaphore completionLock;
    private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
    private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
    private final ClientLogger logger;

    /* loaded from: input_file:com/azure/messaging/servicebus/FluxAutoComplete$AutoCompleteSubscriber.class */
    static final class AutoCompleteSubscriber extends BaseSubscriber<ServiceBusMessageContext> {
        private final CoreSubscriber<? super ServiceBusMessageContext> downstream;
        private final Function<ServiceBusMessageContext, Mono<Void>> onComplete;
        private final Function<ServiceBusMessageContext, Mono<Void>> onAbandon;
        private final Semaphore semaphore;
        private final ClientLogger logger;

        AutoCompleteSubscriber(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber, Semaphore semaphore, Function<ServiceBusMessageContext, Mono<Void>> function, Function<ServiceBusMessageContext, Mono<Void>> function2, ClientLogger clientLogger) {
            this.downstream = coreSubscriber;
            this.onComplete = function;
            this.onAbandon = function2;
            this.semaphore = semaphore;
            this.logger = clientLogger;
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnSubscribe(Subscription subscription) {
            this.logger.info("Subscription received. Subscribing downstream. {}", subscription);
            this.downstream.onSubscribe(this);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // reactor.core.publisher.BaseSubscriber
        public void hookOnNext(ServiceBusMessageContext serviceBusMessageContext) {
            ServiceBusReceivedMessage message = serviceBusMessageContext.getMessage();
            String valueOf = message != null ? String.valueOf(message.getSequenceNumber()) : ClientConstants.NOT_APPLICABLE;
            this.logger.verbose("ON NEXT: Passing message downstream. sequenceNumber[{}]", valueOf);
            try {
                this.semaphore.acquire();
            } catch (InterruptedException e) {
                this.logger.info("Unable to acquire semaphore.", e);
            }
            try {
                try {
                    this.downstream.onNext(serviceBusMessageContext);
                    applyWithCatch(this.onComplete, serviceBusMessageContext, "complete");
                    this.logger.verbose("ON NEXT: Finished. sequenceNumber[{}]", valueOf);
                    this.semaphore.release();
                } catch (Throwable th) {
                    this.logger.verbose("ON NEXT: Finished. sequenceNumber[{}]", valueOf);
                    this.semaphore.release();
                    throw th;
                }
            } catch (Exception e2) {
                this.logger.error("Error occurred processing message. Abandoning. sequenceNumber[{}]", valueOf, e2);
                applyWithCatch(this.onAbandon, serviceBusMessageContext, "abandon");
                this.logger.verbose("ON NEXT: Finished. sequenceNumber[{}]", valueOf);
                this.semaphore.release();
            }
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnError(Throwable th) {
            this.logger.error("Error occurred. Passing downstream.", th);
            this.downstream.onError(th);
        }

        @Override // reactor.core.publisher.BaseSubscriber
        protected void hookOnComplete() {
            this.downstream.onComplete();
        }

        @Override // reactor.core.CoreSubscriber
        public Context currentContext() {
            return this.downstream.currentContext();
        }

        private void applyWithCatch(Function<ServiceBusMessageContext, Mono<Void>> function, ServiceBusMessageContext serviceBusMessageContext, String str) {
            if (serviceBusMessageContext.getMessage() == null || !serviceBusMessageContext.getMessage().isSettled()) {
                try {
                    function.apply(serviceBusMessageContext).block();
                } catch (Exception e) {
                    this.logger.warning("Unable to '{}' message.", str, e);
                    upstream().cancel();
                    onError(e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FluxAutoComplete(Flux<? extends ServiceBusMessageContext> flux, Semaphore semaphore, Function<ServiceBusMessageContext, Mono<Void>> function, Function<ServiceBusMessageContext, Mono<Void>> function2) {
        super(flux);
        this.logger = new ClientLogger((Class<?>) FluxAutoComplete.class);
        this.completionLock = semaphore;
        this.onComplete = (Function) Objects.requireNonNull(function, "'onComplete' cannot be null.");
        this.onAbandon = (Function) Objects.requireNonNull(function2, "'onAbandon' cannot be null.");
    }

    @Override // reactor.core.publisher.Flux, reactor.core.CorePublisher
    public void subscribe(CoreSubscriber<? super ServiceBusMessageContext> coreSubscriber) {
        Objects.requireNonNull(coreSubscriber, "'coreSubscriber' cannot be null.");
        this.source.subscribe((CoreSubscriber) new AutoCompleteSubscriber(coreSubscriber, this.completionLock, this.onComplete, this.onAbandon, this.logger));
    }
}
