package com.azure.messaging.servicebus;

import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.ProcessKind;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.servicebus.ServiceBusClientBuilder;
import com.azure.messaging.servicebus.implementation.ServiceBusConstants;
import com.azure.messaging.servicebus.implementation.models.ServiceBusProcessorClientOptions;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Signal;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/messaging/servicebus/ServiceBusProcessorClient.class */
public final class ServiceBusProcessorClient implements AutoCloseable {
    private static final int SCHEDULER_INTERVAL_IN_SECONDS = 10;
    private final ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder sessionReceiverBuilder;
    private final ServiceBusClientBuilder.ServiceBusReceiverClientBuilder receiverBuilder;
    private final Consumer<ServiceBusReceivedMessageContext> processMessage;
    private final Consumer<ServiceBusErrorContext> processError;
    private final ServiceBusProcessorClientOptions processorOptions;
    private final TracerProvider tracerProvider;
    private ScheduledExecutorService scheduledExecutor;
    private final ClientLogger logger = new ClientLogger((Class<?>) ServiceBusProcessorClient.class);
    private final Map<Subscription, Subscription> receiverSubscriptions = new ConcurrentHashMap();
    private final AtomicReference<ServiceBusReceiverAsyncClient> asyncClient = new AtomicReference<>();
    private final AtomicBoolean isRunning = new AtomicBoolean();

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder serviceBusSessionReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, ServiceBusProcessorClientOptions serviceBusProcessorClientOptions) {
        this.sessionReceiverBuilder = (ServiceBusClientBuilder.ServiceBusSessionReceiverClientBuilder) Objects.requireNonNull(serviceBusSessionReceiverClientBuilder, "'sessionReceiverBuilder' cannot be null");
        this.processMessage = (Consumer) Objects.requireNonNull(consumer, "'processMessage' cannot be null");
        this.processError = (Consumer) Objects.requireNonNull(consumer2, "'processError' cannot be null");
        this.processorOptions = (ServiceBusProcessorClientOptions) Objects.requireNonNull(serviceBusProcessorClientOptions, "'processorOptions' cannot be null");
        this.asyncClient.set(serviceBusSessionReceiverClientBuilder.buildAsyncClientForProcessor());
        this.receiverBuilder = null;
        this.tracerProvider = serviceBusProcessorClientOptions.getTracerProvider();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ServiceBusProcessorClient(ServiceBusClientBuilder.ServiceBusReceiverClientBuilder serviceBusReceiverClientBuilder, Consumer<ServiceBusReceivedMessageContext> consumer, Consumer<ServiceBusErrorContext> consumer2, ServiceBusProcessorClientOptions serviceBusProcessorClientOptions) {
        this.receiverBuilder = (ServiceBusClientBuilder.ServiceBusReceiverClientBuilder) Objects.requireNonNull(serviceBusReceiverClientBuilder, "'receiverBuilder' cannot be null");
        this.processMessage = (Consumer) Objects.requireNonNull(consumer, "'processMessage' cannot be null");
        this.processError = (Consumer) Objects.requireNonNull(consumer2, "'processError' cannot be null");
        this.processorOptions = (ServiceBusProcessorClientOptions) Objects.requireNonNull(serviceBusProcessorClientOptions, "'processorOptions' cannot be null");
        this.asyncClient.set(serviceBusReceiverClientBuilder.buildAsyncClient());
        this.sessionReceiverBuilder = null;
        this.tracerProvider = serviceBusProcessorClientOptions.getTracerProvider();
    }

    public synchronized void start() {
        if (this.isRunning.getAndSet(true)) {
            this.logger.info("Processor is already running");
            return;
        }
        if (this.asyncClient.get() == null) {
            this.asyncClient.set(this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient());
        }
        receiveMessages();
        if (this.scheduledExecutor == null) {
            this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
            this.scheduledExecutor.scheduleWithFixedDelay(() -> {
                if (this.asyncClient.get().isConnectionClosed()) {
                    restartMessageReceiver(null);
                }
            }, 10L, 10L, TimeUnit.SECONDS);
        }
    }

    public synchronized void stop() {
        this.isRunning.set(false);
    }

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        this.isRunning.set(false);
        this.receiverSubscriptions.keySet().forEach((v0) -> {
            v0.cancel();
        });
        this.receiverSubscriptions.clear();
        if (this.scheduledExecutor != null) {
            this.scheduledExecutor.shutdown();
            this.scheduledExecutor = null;
        }
        if (this.asyncClient.get() != null) {
            this.asyncClient.get().close();
            this.asyncClient.set(null);
        }
    }

    public synchronized boolean isRunning() {
        return this.isRunning.get();
    }

    private synchronized void receiveMessages() {
        if (this.receiverSubscriptions.size() > 0) {
            this.receiverSubscriptions.keySet().forEach(subscription -> {
                subscription.request(1L);
            });
            return;
        }
        final ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient = this.asyncClient.get();
        CoreSubscriber<? super ServiceBusMessageContext>[] coreSubscriberArr = new CoreSubscriber[this.processorOptions.getMaxConcurrentCalls()];
        for (int i = 0; i < this.processorOptions.getMaxConcurrentCalls(); i++) {
            coreSubscriberArr[i] = new CoreSubscriber<ServiceBusMessageContext>() { // from class: com.azure.messaging.servicebus.ServiceBusProcessorClient.1
                private Subscription subscription = null;

                @Override // reactor.core.CoreSubscriber
                public void onSubscribe(Subscription subscription2) {
                    this.subscription = subscription2;
                    ServiceBusProcessorClient.this.receiverSubscriptions.put(subscription2, subscription2);
                    subscription2.request(1L);
                }

                @Override // org.reactivestreams.Subscriber
                public void onNext(ServiceBusMessageContext serviceBusMessageContext) {
                    if (serviceBusMessageContext.hasError()) {
                        ServiceBusProcessorClient.this.handleError(serviceBusMessageContext.getThrowable());
                    } else {
                        Context context = null;
                        try {
                            ServiceBusReceivedMessageContext serviceBusReceivedMessageContext = new ServiceBusReceivedMessageContext(serviceBusReceiverAsyncClient, serviceBusMessageContext);
                            context = ServiceBusProcessorClient.this.startProcessTracingSpan(serviceBusMessageContext.getMessage(), serviceBusReceiverAsyncClient.getEntityPath(), serviceBusReceiverAsyncClient.getFullyQualifiedNamespace());
                            if (context.getData(Tracer.SPAN_CONTEXT_KEY).isPresent()) {
                                serviceBusMessageContext.getMessage().addContext(Tracer.SPAN_CONTEXT_KEY, context);
                            }
                            ServiceBusProcessorClient.this.processMessage.accept(serviceBusReceivedMessageContext);
                            ServiceBusProcessorClient.this.endProcessTracingSpan(context, Signal.complete());
                        } catch (Exception e) {
                            ServiceBusProcessorClient.this.handleError(new ServiceBusException(e, ServiceBusErrorSource.USER_CALLBACK));
                            ServiceBusProcessorClient.this.endProcessTracingSpan(context, Signal.error(e));
                            if (!ServiceBusProcessorClient.this.processorOptions.isDisableAutoComplete()) {
                                ServiceBusProcessorClient.this.logger.warning("Error when processing message. Abandoning message.", e);
                                ServiceBusProcessorClient.this.abandonMessage(serviceBusMessageContext, serviceBusReceiverAsyncClient);
                            }
                        }
                    }
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.logger.verbose("Requesting 1 more message from upstream");
                        this.subscription.request(1L);
                    }
                }

                @Override // org.reactivestreams.Subscriber
                public void onError(Throwable th) {
                    ServiceBusProcessorClient.this.logger.info("Error receiving messages.", th);
                    ServiceBusProcessorClient.this.handleError(th);
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }

                @Override // org.reactivestreams.Subscriber
                public void onComplete() {
                    ServiceBusProcessorClient.this.logger.info("Completed receiving messages.");
                    if (ServiceBusProcessorClient.this.isRunning.get()) {
                        ServiceBusProcessorClient.this.restartMessageReceiver(this.subscription);
                    }
                }
            };
        }
        serviceBusReceiverAsyncClient.receiveMessagesWithContext().parallel(this.processorOptions.getMaxConcurrentCalls(), 1).runOn(Schedulers.boundedElastic(), 1).subscribe(coreSubscriberArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void endProcessTracingSpan(Context context, Signal<Void> signal) {
        if (context == null) {
            return;
        }
        Optional<Object> data = context.getData("scope");
        if (data.isPresent() && this.tracerProvider.isEnabled()) {
            if (data.get() instanceof AutoCloseable) {
                try {
                    ((AutoCloseable) context.getData("scope").get()).close();
                } catch (Exception e) {
                    this.logger.error("endTracingSpan().close() failed with an error {}", e);
                }
            } else {
                ClientLogger clientLogger = this.logger;
                Locale locale = Locale.US;
                Object[] objArr = new Object[1];
                objArr[0] = data.get() != null ? data.getClass() : "null";
                clientLogger.warning(String.format(locale, "Process span scope type is not of type AutoCloseable, but type: %s. Not closing the scope and span", objArr));
            }
            this.tracerProvider.endSpan(context, signal);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Context startProcessTracingSpan(ServiceBusReceivedMessage serviceBusReceivedMessage, String str, String str2) {
        Object obj = serviceBusReceivedMessage.getApplicationProperties().get(Tracer.DIAGNOSTIC_ID_KEY);
        if (obj == null || !this.tracerProvider.isEnabled()) {
            return Context.NONE;
        }
        Context addData = this.tracerProvider.extractContext(obj.toString(), Context.NONE).addData(Tracer.ENTITY_PATH_KEY, str).addData(Tracer.HOST_NAME_KEY, str2).addData(Tracer.AZ_TRACING_NAMESPACE_KEY, ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE);
        return this.tracerProvider.startSpan(ServiceBusConstants.AZ_TRACING_SERVICE_NAME, serviceBusReceivedMessage.getEnqueuedTime() == null ? addData : addData.addData("x-opt-enqueued-time", Long.valueOf(serviceBusReceivedMessage.getEnqueuedTime().toInstant().getEpochSecond())), ProcessKind.PROCESS);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void abandonMessage(ServiceBusMessageContext serviceBusMessageContext, ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient) {
        try {
            serviceBusReceiverAsyncClient.abandon(serviceBusMessageContext.getMessage()).block();
        } catch (Exception e) {
            this.logger.verbose("Failed to abandon message", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleError(Throwable th) {
        try {
            ServiceBusReceiverAsyncClient serviceBusReceiverAsyncClient = this.asyncClient.get();
            this.processError.accept(new ServiceBusErrorContext(th, serviceBusReceiverAsyncClient.getFullyQualifiedNamespace(), serviceBusReceiverAsyncClient.getEntityPath()));
        } catch (Exception e) {
            this.logger.verbose("Error from error handler. Ignoring error.", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void restartMessageReceiver(Subscription subscription) {
        if (isRunning()) {
            if (subscription == null || this.receiverSubscriptions.containsKey(subscription)) {
                this.receiverSubscriptions.keySet().forEach((v0) -> {
                    v0.cancel();
                });
                this.receiverSubscriptions.clear();
                this.asyncClient.get().close();
                this.asyncClient.set(this.receiverBuilder == null ? this.sessionReceiverBuilder.buildAsyncClientForProcessor() : this.receiverBuilder.buildAsyncClient());
                receiveMessages();
            }
        }
    }
}
