package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpConnection;
import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpManagementNode;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.ClaimsBasedSecurityNode;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.nimbusds.openid.connect.sdk.claims.PersonClaims;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.reactor.Reactor;
import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection.class */
public class ReactorConnection implements AmqpConnection {
    private static final String CBS_SESSION_NAME = "cbs-session";
    private static final String CBS_ADDRESS = "$cbs";
    private static final String CBS_LINK_NAME = "cbs";
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private final ClientLogger logger;
    private final Flux<AmqpEndpointState> endpointStates;
    private final String connectionId;
    private final Mono<Connection> connectionMono;
    private final ConnectionHandler handler;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final MessageSerializer messageSerializer;
    private final ConnectionOptions connectionOptions;
    private final ReactorProvider reactorProvider;
    private final AmqpRetryPolicy retryPolicy;
    private final SenderSettleMode senderSettleMode;
    private final ReceiverSettleMode receiverSettleMode;
    private final Duration operationTimeout;
    private final Disposable.Composite subscriptions;
    private ReactorExecutor executor;
    private volatile ClaimsBasedSecurityChannel cbsChannel;
    private volatile AmqpChannelProcessor<RequestResponseChannel> cbsChannelProcessor;
    private volatile Connection connection;
    private final ConcurrentMap<String, SessionSubscription> sessionMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<String, AmqpManagementNode> managementNodes = new ConcurrentHashMap<>();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Sinks.One<AmqpShutdownSignal> shutdownSignalSink = Sinks.one();
    private final Sinks.Empty<Void> isClosedMono = Sinks.empty();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection$ReactorExceptionHandler.class */
    public final class ReactorExceptionHandler extends AmqpExceptionHandler {
        private ReactorExceptionHandler() {
        }

        @Override // com.azure.core.amqp.implementation.AmqpExceptionHandler
        public void onConnectionError(Throwable th) {
            ReactorConnection.this.logger.atInfo().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionError, Starting new reactor", th);
            if (ReactorConnection.this.isDisposed.getAndSet(true)) {
                return;
            }
            ReactorConnection.this.logger.atVerbose().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onReactorError: Disposing.");
            ReactorConnection.this.closeAsync(new AmqpShutdownSignal(false, false, "onReactorError: " + th.toString())).subscribe();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @Override // com.azure.core.amqp.implementation.AmqpExceptionHandler
        public void onConnectionShutdown(AmqpShutdownSignal amqpShutdownSignal) {
            AmqpLoggingUtils.addShutdownSignal(ReactorConnection.this.logger.atInfo(), amqpShutdownSignal).addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionShutdown. Shutting down.");
            if (ReactorConnection.this.isDisposed.getAndSet(true)) {
                return;
            }
            ReactorConnection.this.logger.atVerbose().addKeyValue("namespace", ReactorConnection.this.getFullyQualifiedNamespace()).log("onConnectionShutdown: disposing.");
            ReactorConnection.this.closeAsync(amqpShutdownSignal).subscribe();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorConnection$SessionSubscription.class */
    public static final class SessionSubscription {
        private final AtomicBoolean isDisposed;
        private final AmqpSession session;
        private final Disposable subscription;

        private SessionSubscription(AmqpSession amqpSession, Disposable disposable) {
            this.isDisposed = new AtomicBoolean();
            this.session = amqpSession;
            this.subscription = disposable;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public AmqpSession getSession() {
            return this.session;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void dispose() {
            if (this.isDisposed.getAndSet(true)) {
                return;
            }
            if (this.session instanceof ReactorSession) {
                ((ReactorSession) this.session).closeAsync("Closing session.", null, true).subscribe();
            } else {
                this.session.dispose();
            }
            this.subscription.dispose();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Mono<Void> isClosed() {
            return this.session instanceof ReactorSession ? ((ReactorSession) this.session).isClosed() : Mono.empty();
        }
    }

    public ReactorConnection(String str, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
        this.connectionOptions = connectionOptions;
        this.reactorProvider = reactorProvider;
        this.connectionId = str;
        this.logger = new ClientLogger((Class<?>) ReactorConnection.class, AmqpLoggingUtils.createContextWithConnectionId(str));
        this.handlerProvider = reactorHandlerProvider;
        this.tokenManagerProvider = (TokenManagerProvider) Objects.requireNonNull(tokenManagerProvider, "'tokenManagerProvider' cannot be null.");
        this.messageSerializer = messageSerializer;
        this.handler = reactorHandlerProvider.createConnectionHandler(str, connectionOptions);
        this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.getRetry());
        this.operationTimeout = connectionOptions.getRetry().getTryTimeout();
        this.senderSettleMode = senderSettleMode;
        this.receiverSettleMode = receiverSettleMode;
        this.connectionMono = Mono.fromCallable(this::getOrCreateConnection).flatMap(connection -> {
            return getEndpointStates().filter(amqpEndpointState -> {
                return amqpEndpointState == AmqpEndpointState.ACTIVE;
            }).next().timeout(this.operationTimeout, Mono.error((Supplier<? extends Throwable>) () -> {
                return new AmqpException(true, String.format("Connection '%s' not opened within AmqpRetryOptions.tryTimeout(): %s", str, this.operationTimeout), this.handler.getErrorContext());
            })).thenReturn(connection);
        }).doOnError(th -> {
            if (this.isDisposed.getAndSet(true)) {
                this.logger.verbose("Connection was already disposed: Error occurred while connection was starting.", th);
            } else {
                closeAsync(new AmqpShutdownSignal(false, false, "Error occurred while connection was starting. Error: " + th)).subscribe();
            }
        });
        this.endpointStates = this.handler.getEndpointStates().takeUntilOther(this.shutdownSignalSink.asMono()).map(endpointState -> {
            this.logger.verbose("State {}", endpointState);
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).onErrorResume(th2 -> {
            if (this.isDisposed.getAndSet(true)) {
                return Mono.error(th2);
            }
            this.logger.verbose("Disposing of active sessions due to error.");
            return closeAsync(new AmqpShutdownSignal(false, false, th2.getMessage())).then(Mono.error(th2));
        }).doOnComplete(() -> {
            if (this.isDisposed.getAndSet(true)) {
                return;
            }
            this.logger.verbose("Disposing of active sessions due to connection close.");
            closeAsync(new AmqpShutdownSignal(false, false, "Connection handler closed.")).subscribe();
        }).cache(1);
        this.subscriptions = Disposables.composite(this.endpointStates.subscribe());
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Flux<AmqpShutdownSignal> getShutdownSignals() {
        return this.shutdownSignalSink.asMono().cache().flux();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Mono<AmqpManagementNode> getManagementNode(String str) {
        return Mono.defer(() -> {
            if (isDisposed()) {
                return FluxUtil.monoError(this.logger.atError().addKeyValue(ClientConstants.ENTITY_PATH_KEY, str), Exceptions.propagate(new IllegalStateException("Connection is disposed. Cannot get management instance.")));
            }
            AmqpManagementNode amqpManagementNode = this.managementNodes.get(str);
            if (amqpManagementNode != null) {
                return Mono.just(amqpManagementNode);
            }
            TokenManager tokenManager = new AzureTokenManagerProvider(this.connectionOptions.getAuthorizationType(), this.connectionOptions.getFullyQualifiedNamespace(), this.connectionOptions.getAuthorizationScope()).getTokenManager(getClaimsBasedSecurityNode(), str);
            return tokenManager.authorize().thenReturn(this.managementNodes.compute(str, (str2, amqpManagementNode2) -> {
                if (amqpManagementNode2 != null) {
                    this.logger.info("A management node exists already, returning it.");
                    tokenManager.close();
                    return amqpManagementNode2;
                }
                String str2 = str + "-" + MANAGEMENT_SESSION_NAME;
                String str3 = str + "-" + MANAGEMENT_LINK_NAME;
                String str4 = str + "/$management";
                this.logger.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, str).addKeyValue(ClientConstants.LINK_NAME_KEY, str3).addKeyValue(PersonClaims.ADDRESS_CLAIM_NAME, str4).log("Creating management node.");
                return new ManagementChannel(createRequestResponseChannel(str2, str3, str4), getFullyQualifiedNamespace(), str, tokenManager);
            }));
        });
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
        return this.connectionMono.then(Mono.fromCallable(() -> {
            return getOrCreateCBSNode();
        }));
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public String getId() {
        return this.connectionId;
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public String getFullyQualifiedNamespace() {
        return this.handler.getHostname();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public int getMaxFrameSize() {
        return this.handler.getMaxFrameSize();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Map<String, Object> getConnectionProperties() {
        return this.handler.getConnectionProperties();
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public Mono<AmqpSession> createSession(String str) {
        return this.connectionMono.map(connection -> {
            return this.sessionMap.computeIfAbsent(str, str2 -> {
                SessionHandler createSessionHandler = this.handlerProvider.createSessionHandler(this.connectionId, getFullyQualifiedNamespace(), str2, this.connectionOptions.getRetry().getTryTimeout());
                Session session = connection.session();
                BaseHandler.setHandler(session, createSessionHandler);
                AmqpSession createSession = createSession(str2, session, createSessionHandler);
                return new SessionSubscription(createSession, createSession.getEndpointStates().subscribe(amqpEndpointState -> {
                }, th -> {
                    if (this.isDisposed.get()) {
                        return;
                    }
                    this.logger.atInfo().addKeyValue(ClientConstants.SESSION_NAME_KEY, str).log("Error occurred. Removing and disposing session", th);
                    removeSession(str2);
                }, () -> {
                    if (this.isDisposed.get()) {
                        return;
                    }
                    this.logger.atVerbose().addKeyValue(ClientConstants.SESSION_NAME_KEY, str).log("Complete. Removing and disposing session.");
                    removeSession(str2);
                }));
            });
        }).flatMap(sessionSubscription -> {
            return sessionSubscription.getSession().getEndpointStates().filter(amqpEndpointState -> {
                return amqpEndpointState == AmqpEndpointState.ACTIVE;
            }).next().timeout(this.retryPolicy.getRetryOptions().getTryTimeout(), Mono.error((Supplier<? extends Throwable>) () -> {
                return new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, String.format("connectionId[%s] sessionName[%s] Timeout waiting for session to be active.", this.connectionId, str), this.handler.getErrorContext());
            })).doOnError(th -> {
                if ((th instanceof AmqpException) && ((AmqpException) th).getErrorCondition() == AmqpErrorCondition.TIMEOUT_ERROR) {
                    this.sessionMap.remove(str).dispose();
                }
            }).thenReturn(sessionSubscription.getSession());
        });
    }

    protected AmqpSession createSession(String str, Session session, SessionHandler sessionHandler) {
        return new ReactorSession(this, session, sessionHandler, str, this.reactorProvider, this.handlerProvider, getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.messageSerializer, this.connectionOptions.getRetry());
    }

    @Override // com.azure.core.amqp.AmqpConnection
    public boolean removeSession(String str) {
        if (str == null) {
            return false;
        }
        SessionSubscription remove = this.sessionMap.remove(str);
        if (remove != null) {
            remove.dispose();
        }
        return remove != null;
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        closeAsync().block(this.operationTimeout.plus(this.operationTimeout));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Connection> getReactorConnection() {
        return this.connectionMono;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AmqpChannelProcessor<RequestResponseChannel> createRequestResponseChannel(String str, String str2, String str3) {
        Objects.requireNonNull(str3, "'entityPath' cannot be null.");
        Flux repeat = createSession(str).cast(ReactorSession.class).map(reactorSession -> {
            return new RequestResponseChannel(this, getId(), getFullyQualifiedNamespace(), str2, str3, reactorSession.session(), this.connectionOptions.getRetry(), this.handlerProvider, this.reactorProvider, this.messageSerializer, this.senderSettleMode, this.receiverSettleMode, this.handlerProvider.getMetricProvider(getFullyQualifiedNamespace(), str3));
        }).doOnNext(requestResponseChannel -> {
            this.logger.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, str3).addKeyValue(ClientConstants.LINK_NAME_KEY, str2).log("Emitting new response channel.");
        }).repeat(() -> {
            return !isDisposed();
        });
        Map<String, Object> createContextWithConnectionId = AmqpLoggingUtils.createContextWithConnectionId(this.connectionId);
        createContextWithConnectionId.put(ClientConstants.ENTITY_PATH_KEY, str3);
        return (AmqpChannelProcessor) repeat.subscribeWith(new AmqpChannelProcessor(getFullyQualifiedNamespace(), requestResponseChannel2 -> {
            return requestResponseChannel2.getEndpointStates();
        }, this.retryPolicy, createContextWithConnectionId));
    }

    @Override // com.azure.core.amqp.AmqpConnection, com.azure.core.util.AsyncCloseable
    public Mono<Void> closeAsync() {
        if (!this.isDisposed.getAndSet(true)) {
            return closeAsync(new AmqpShutdownSignal(false, true, "Disposed by client."));
        }
        this.logger.verbose("Connection was already closed. Not disposing again.");
        return this.isClosedMono.asMono();
    }

    Mono<Void> closeAsync(AmqpShutdownSignal amqpShutdownSignal) {
        AmqpLoggingUtils.addShutdownSignal(this.logger.atInfo(), amqpShutdownSignal).log("Disposing of ReactorConnection.");
        Sinks.EmitResult tryEmitValue = this.shutdownSignalSink.tryEmitValue(amqpShutdownSignal);
        if (tryEmitValue.isFailure()) {
            AmqpLoggingUtils.addShutdownSignal(this.logger.atInfo(), amqpShutdownSignal).addKeyValue(ClientConstants.EMIT_RESULT_KEY, tryEmitValue).log("Unable to emit shutdown signal.");
        }
        return Mono.whenDelayError((Publisher<?>[]) new Publisher[]{(this.cbsChannelProcessor != null ? this.cbsChannelProcessor.flatMap(requestResponseChannel -> {
            return requestResponseChannel.closeAsync();
        }) : Mono.empty()).doFinally(signalType -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.SIGNAL_TYPE_KEY, signalType).log("Closed CBS node.");
        }), Mono.when((Publisher<?>[]) new Publisher[]{Flux.fromStream(this.managementNodes.values().stream()).flatMap(amqpManagementNode -> {
            return amqpManagementNode.closeAsync();
        })}).doFinally(signalType2 -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.SIGNAL_TYPE_KEY, signalType2).log("Closed management nodes.");
        })}).then(Mono.fromRunnable(() -> {
            this.logger.verbose("Scheduling closeConnection work.");
            ReactorDispatcher reactorDispatcher = this.reactorProvider.getReactorDispatcher();
            if (reactorDispatcher == null) {
                closeConnectionWork();
                return;
            }
            try {
                reactorDispatcher.invoke(() -> {
                    closeConnectionWork();
                });
            } catch (IOException e) {
                this.logger.warning("IOException while scheduling closeConnection work. Manually disposing.", e);
                closeConnectionWork();
            } catch (RejectedExecutionException e2) {
                this.logger.info("Could not schedule closeConnection work. Manually disposing.");
                closeConnectionWork();
            }
        }).doFinally(signalType3 -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.SIGNAL_TYPE_KEY, signalType3).log("Closed reactor dispatcher.");
        })).then(this.isClosedMono.asMono());
    }

    private synchronized void closeConnectionWork() {
        if (this.connection == null) {
            this.isClosedMono.emitEmpty((signalType, emitResult) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atInfo(), signalType, emitResult).log("Unable to complete closeMono.");
                return false;
            });
            return;
        }
        this.connection.close();
        this.handler.close();
        ArrayList arrayList = new ArrayList();
        this.sessionMap.values().forEach(sessionSubscription -> {
            arrayList.add(sessionSubscription.isClosed());
        });
        this.subscriptions.add(Mono.when(arrayList).timeout(this.operationTimeout).onErrorResume(th -> {
            this.logger.info("Timed out waiting for all sessions to close.");
            return Mono.empty();
        }).then(this.executor != null ? Mono.defer(() -> {
            Mono<Void> closeAsync;
            synchronized (this) {
                this.logger.info("Closing executor.");
                closeAsync = this.executor.closeAsync();
            }
            return closeAsync;
        }) : Mono.empty()).then(Mono.fromRunnable(() -> {
            this.isClosedMono.emitEmpty((signalType2, emitResult2) -> {
                AmqpLoggingUtils.addSignalTypeAndResult(this.logger.atWarning(), signalType2, emitResult2).log("Unable to emit connection closed signal.");
                return false;
            });
            this.subscriptions.dispose();
        })).subscribe());
    }

    private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
        if (this.cbsChannel == null) {
            this.logger.info("Setting CBS channel.");
            this.cbsChannelProcessor = createRequestResponseChannel(CBS_SESSION_NAME, CBS_LINK_NAME, "$cbs");
            this.cbsChannel = new ClaimsBasedSecurityChannel(this.cbsChannelProcessor, this.connectionOptions.getTokenCredential(), this.connectionOptions.getAuthorizationType(), this.connectionOptions.getRetry());
        }
        return this.cbsChannel;
    }

    private synchronized Connection getOrCreateConnection() throws IOException {
        if (this.connection == null) {
            this.logger.atInfo().addKeyValue(ClientConstants.HOSTNAME_KEY, this.handler.getHostname()).addKeyValue("port", this.handler.getProtocolPort()).log("Creating and starting connection.");
            Reactor createReactor = this.reactorProvider.createReactor(this.connectionId, this.handler.getMaxFrameSize());
            this.connection = createReactor.connectionToHost(this.handler.getHostname(), this.handler.getProtocolPort(), this.handler);
            ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();
            Duration dividedBy = this.connectionOptions.getRetry().getTryTimeout().dividedBy(2L);
            this.executor = new ReactorExecutor(createReactor, Schedulers.newSingle("reactor-executor"), this.connectionId, reactorExceptionHandler, ClientConstants.SERVER_BUSY_WAIT_TIME.compareTo(dividedBy) < 0 ? ClientConstants.SERVER_BUSY_WAIT_TIME : dividedBy, this.connectionOptions.getFullyQualifiedNamespace());
            Mono defer = Mono.defer(() -> {
                Mono<Void> closeAsync;
                synchronized (this) {
                    closeAsync = this.executor.closeAsync();
                }
                return closeAsync;
            });
            this.reactorProvider.getReactorDispatcher().getShutdownSignal().flatMap(amqpShutdownSignal -> {
                reactorExceptionHandler.onConnectionShutdown(amqpShutdownSignal);
                return defer;
            }).onErrorResume(th -> {
                reactorExceptionHandler.onConnectionError(th);
                return defer;
            }).subscribe();
            this.executor.start();
        }
        return this.connection;
    }
}
