package io.vertx.amqp.impl;

import io.vertx.amqp.AmqpClientOptions;
import io.vertx.amqp.AmqpConnection;
import io.vertx.amqp.AmqpReceiver;
import io.vertx.amqp.AmqpReceiverOptions;
import io.vertx.amqp.AmqpSender;
import io.vertx.amqp.AmqpSenderOptions;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonLinkOptions;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.impl.ProtonConnectionImpl;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.engine.EndpointState;

/* loaded from: input_file:io/vertx/amqp/impl/AmqpConnectionImpl.class */
public class AmqpConnectionImpl implements AmqpConnection {
    public static final String PRODUCT = "vertx-amqp-client";
    public static final Symbol PRODUCT_KEY = Symbol.valueOf("product");
    private final AmqpClientOptions options;
    private final Context context;
    private Handler<Throwable> exceptionHandler;
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicReference<ProtonConnection> connection = new AtomicReference<>();
    private final List<AmqpSender> senders = new CopyOnWriteArrayList();
    private final List<AmqpReceiver> receivers = new CopyOnWriteArrayList();
    private final Promise<Void> closePromise = Promise.promise();

    /* JADX INFO: Access modifiers changed from: package-private */
    public AmqpConnectionImpl(Context context, AmqpClientOptions amqpClientOptions, ProtonClient protonClient, Promise<AmqpConnection> promise) {
        this.options = amqpClientOptions;
        this.context = context;
        runOnContext(r8 -> {
            connect((ProtonClient) Objects.requireNonNull(protonClient, "proton cannot be `null`"), (Promise) Objects.requireNonNull(promise, "connection handler cannot be `null`"));
        });
    }

    private void connect(ProtonClient protonClient, Promise<AmqpConnection> promise) {
        protonClient.connect(this.options, this.options.getHost(), this.options.getPort(), this.options.getUsername(), this.options.getPassword(), asyncResult -> {
            if (!asyncResult.succeeded()) {
                promise.fail(asyncResult.cause());
                return;
            }
            ProtonConnection protonConnection = (ProtonConnection) asyncResult.result();
            if (!this.connection.compareAndSet(null, protonConnection)) {
                promise.fail("Unable to connect - already holding a connection");
                return;
            }
            HashMap hashMap = new HashMap();
            hashMap.put(PRODUCT_KEY, PRODUCT);
            if (this.options.getContainerId() != null) {
                this.connection.get().setContainer(this.options.getContainerId());
            }
            if (this.options.getConnectionHostname() != null) {
                this.connection.get().setHostname(this.options.getConnectionHostname());
            }
            this.connection.get().setProperties(hashMap).disconnectHandler(protonConnection2 -> {
                try {
                    onDisconnect();
                } finally {
                    this.closed.set(true);
                    this.closePromise.tryComplete();
                }
            }).closeHandler(asyncResult -> {
                try {
                    onDisconnect();
                    protonConnection.close();
                    runOnContext(r3 -> {
                        if (protonConnection.isDisconnected()) {
                            return;
                        }
                        protonConnection.disconnect();
                    });
                } finally {
                    this.closed.set(true);
                    this.closePromise.tryComplete();
                }
            }).openHandler(asyncResult2 -> {
                if (asyncResult2.succeeded()) {
                    this.closed.set(false);
                    promise.complete(this);
                } else {
                    this.closed.set(true);
                    this.closePromise.tryComplete();
                    promise.fail(asyncResult2.cause());
                }
            });
            this.connection.get().open();
        });
    }

    private void onDisconnect() {
        Handler<Throwable> handler = null;
        ProtonConnection andSet = this.connection.getAndSet(null);
        synchronized (this) {
            if (this.exceptionHandler != null) {
                handler = this.exceptionHandler;
            }
        }
        if (handler == null || this.closed.get()) {
            return;
        }
        handler.handle(new Exception(getErrorMessage(andSet)));
    }

    private String getErrorMessage(ProtonConnection protonConnection) {
        String str = "Connection disconnected";
        if (protonConnection != null) {
            if (protonConnection.getCondition() != null && protonConnection.getCondition().getDescription() != null) {
                str = str + " - " + protonConnection.getCondition().getDescription();
            } else if (protonConnection.getRemoteCondition() != null && protonConnection.getRemoteCondition().getDescription() != null) {
                str = str + " - " + protonConnection.getRemoteCondition().getDescription();
            }
        }
        return str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runOnContext(Handler<Void> handler) {
        this.context.runOnContext(handler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runWithTrampoline(Handler<Void> handler) {
        if (Vertx.currentContext() == this.context) {
            handler.handle((Object) null);
        } else {
            runOnContext(handler);
        }
    }

    private boolean isLocalOpen() {
        ProtonConnectionImpl protonConnectionImpl = (ProtonConnection) this.connection.get();
        return protonConnectionImpl != null && protonConnectionImpl.getLocalState() == EndpointState.ACTIVE;
    }

    private boolean isRemoteOpen() {
        ProtonConnectionImpl protonConnectionImpl = (ProtonConnection) this.connection.get();
        return protonConnectionImpl != null && protonConnectionImpl.getRemoteState() == EndpointState.ACTIVE;
    }

    @Override // io.vertx.amqp.AmqpConnection
    public synchronized AmqpConnection exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection close(Handler<AsyncResult<Void>> handler) {
        this.context.runOnContext(r7 -> {
            ProtonConnection protonConnection = this.connection.get();
            if (protonConnection == null || this.closed.get() || !(isLocalOpen() || isRemoteOpen())) {
                if (handler != null) {
                    handler.handle(Future.succeededFuture());
                    return;
                }
                return;
            }
            this.closed.set(true);
            this.closePromise.tryComplete();
            Future future = Future.future();
            if (handler != null) {
                future.onComplete(handler);
            }
            if (protonConnection.isDisconnected()) {
                future.complete();
                return;
            }
            try {
                protonConnection.disconnectHandler(protonConnection2 -> {
                    future.tryFail(getErrorMessage(protonConnection2));
                    this.closed.set(true);
                }).closeHandler(asyncResult -> {
                    this.closed.set(true);
                    if (asyncResult.succeeded()) {
                        future.tryComplete();
                    } else {
                        future.tryFail(asyncResult.cause());
                    }
                    protonConnection.disconnect();
                }).close();
            } catch (Exception e) {
                future.fail(e);
            }
        });
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregister(AmqpSender amqpSender) {
        this.senders.remove(amqpSender);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unregister(AmqpReceiver amqpReceiver) {
        this.receivers.remove(amqpReceiver);
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createDynamicReceiver(Handler<AsyncResult<AmqpReceiver>> handler) {
        return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), handler);
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createReceiver(String str, Handler<AsyncResult<AmqpReceiver>> handler) {
        ProtonLinkOptions protonLinkOptions = new ProtonLinkOptions();
        runWithTrampoline(r13 -> {
            ProtonConnection protonConnection = this.connection.get();
            if (protonConnection == null) {
                handler.handle(Future.failedFuture("Not connected"));
            } else {
                new AmqpReceiverImpl((String) Objects.requireNonNull(str, "The address must not be `null`"), this, new AmqpReceiverOptions(), protonConnection.createReceiver(str, protonLinkOptions), (Handler) Objects.requireNonNull(handler, "The completion handler must not be `null`"));
            }
        });
        return this;
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createReceiver(String str, AmqpReceiverOptions amqpReceiverOptions, Handler<AsyncResult<AmqpReceiver>> handler) {
        ProtonLinkOptions protonLinkOptions = new ProtonLinkOptions();
        AmqpReceiverOptions amqpReceiverOptions2 = amqpReceiverOptions == null ? new AmqpReceiverOptions() : amqpReceiverOptions;
        protonLinkOptions.setDynamic(amqpReceiverOptions2.isDynamic()).setLinkName(amqpReceiverOptions2.getLinkName());
        runWithTrampoline(r14 -> {
            ProtonConnection protonConnection = this.connection.get();
            if (protonConnection == null) {
                handler.handle(Future.failedFuture("Not connected"));
                return;
            }
            ProtonReceiver createReceiver = protonConnection.createReceiver(str, protonLinkOptions);
            if (amqpReceiverOptions != null) {
                if (amqpReceiverOptions.getQos() != null) {
                    createReceiver.setQoS(ProtonQoS.valueOf(amqpReceiverOptions.getQos().toUpperCase()));
                }
                configureTheSource(amqpReceiverOptions2, createReceiver);
            }
            new AmqpReceiverImpl(str, this, amqpReceiverOptions2, createReceiver, handler);
        });
        return this;
    }

    private void configureTheSource(AmqpReceiverOptions amqpReceiverOptions, ProtonReceiver protonReceiver) {
        Source source = protonReceiver.getSource();
        List<String> capabilities = amqpReceiverOptions.getCapabilities();
        if (!capabilities.isEmpty()) {
            source.setCapabilities((Symbol[]) capabilities.stream().map(Symbol::valueOf).toArray(i -> {
                return new Symbol[i];
            }));
        }
        if (amqpReceiverOptions.isDurable()) {
            source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
            source.setDurable(TerminusDurability.UNSETTLED_STATE);
        }
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createSender(String str, Handler<AsyncResult<AmqpSender>> handler) {
        Objects.requireNonNull(str, "The address must be set");
        return createSender(str, new AmqpSenderOptions(), handler);
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createSender(String str, AmqpSenderOptions amqpSenderOptions, Handler<AsyncResult<AmqpSender>> handler) {
        if (str == null && !amqpSenderOptions.isDynamic()) {
            throw new IllegalArgumentException("Address must be set if the link is not dynamic");
        }
        Objects.requireNonNull(handler, "The completion handler must be set");
        runWithTrampoline(r8 -> {
            ProtonSender createSender;
            ProtonConnection protonConnection = this.connection.get();
            if (protonConnection == null) {
                handler.handle(Future.failedFuture("Not connected"));
                return;
            }
            if (amqpSenderOptions != null) {
                ProtonLinkOptions protonLinkOptions = new ProtonLinkOptions();
                protonLinkOptions.setLinkName(amqpSenderOptions.getLinkName());
                protonLinkOptions.setDynamic(amqpSenderOptions.isDynamic());
                createSender = protonConnection.createSender(str, protonLinkOptions);
                createSender.setAutoDrained(amqpSenderOptions.isAutoDrained());
                configureTheTarget(amqpSenderOptions, createSender);
            } else {
                createSender = protonConnection.createSender(str);
            }
            AmqpSenderImpl.create(createSender, this, handler);
        });
        return this;
    }

    private void configureTheTarget(AmqpSenderOptions amqpSenderOptions, ProtonSender protonSender) {
        Target target = protonSender.getTarget();
        List<String> capabilities = amqpSenderOptions.getCapabilities();
        if (capabilities.isEmpty()) {
            return;
        }
        target.setCapabilities((Symbol[]) capabilities.stream().map(Symbol::valueOf).toArray(i -> {
            return new Symbol[i];
        }));
    }

    @Override // io.vertx.amqp.AmqpConnection
    public AmqpConnection createAnonymousSender(Handler<AsyncResult<AmqpSender>> handler) {
        Objects.requireNonNull(handler, "The completion handler must be set");
        runWithTrampoline(r6 -> {
            ProtonConnection protonConnection = this.connection.get();
            if (protonConnection == null) {
                handler.handle(Future.failedFuture("Not connected"));
            } else {
                AmqpSenderImpl.create(protonConnection.createSender((String) null), this, handler);
            }
        });
        return this;
    }

    @Override // io.vertx.amqp.AmqpConnection
    public boolean isDisconnected() {
        ProtonConnection protonConnection = this.connection.get();
        if (protonConnection != null) {
            return protonConnection.isDisconnected();
        }
        return true;
    }

    public Future<Void> closeFuture() {
        return this.closePromise.future();
    }

    public ProtonConnection unwrap() {
        return this.connection.get();
    }

    public AmqpClientOptions options() {
        return this.options;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void register(AmqpSenderImpl amqpSenderImpl) {
        this.senders.add(amqpSenderImpl);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void register(AmqpReceiverImpl amqpReceiverImpl) {
        this.receivers.add(amqpReceiverImpl);
    }
}
