package org.eclipse.hono.service.amqp;

import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.ext.healthchecks.HealthCheckHandler;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonLink;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.ProtonSession;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.lang.ref.WeakReference;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Source;
import org.eclipse.hono.auth.Activity;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.AbstractServiceBase;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;

/* loaded from: input_file:org/eclipse/hono/service/amqp/AmqpServiceBase.class */
public abstract class AmqpServiceBase<T extends ServiceConfigProperties> extends AbstractServiceBase<T> {
    private final Map<String, AmqpEndpoint> endpoints = new HashMap();
    private ProtonServer server;
    private ProtonServer insecureServer;
    private ProtonSaslAuthenticatorFactory saslAuthenticatorFactory;
    private AuthorizationService authorizationService;

    protected abstract String getServiceName();

    @Autowired
    @Qualifier("amqp")
    public void setConfig(T t) {
        setSpecificConfig(t);
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    public int getPortDefaultValue() {
        return 5671;
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    public int getInsecurePortDefaultValue() {
        return 5672;
    }

    @Autowired(required = false)
    public final void addEndpoints(List<AmqpEndpoint> list) {
        Objects.requireNonNull(list);
        Iterator<AmqpEndpoint> it = list.iterator();
        while (it.hasNext()) {
            addEndpoint(it.next());
        }
    }

    public final void addEndpoint(AmqpEndpoint amqpEndpoint) {
        if (this.endpoints.putIfAbsent(amqpEndpoint.getName(), amqpEndpoint) != null) {
            this.LOG.warn("multiple endpoints defined with name [{}]", amqpEndpoint.getName());
        } else {
            this.LOG.debug("registering endpoint [{}]", amqpEndpoint.getName());
        }
    }

    protected final AmqpEndpoint getEndpoint(ResourceIdentifier resourceIdentifier) {
        return getEndpoint(resourceIdentifier.getEndpoint());
    }

    protected final AmqpEndpoint getEndpoint(String str) {
        return this.endpoints.get(str);
    }

    protected final Iterable<AmqpEndpoint> endpoints() {
        return this.endpoints.values();
    }

    @Autowired(required = false)
    public void setSaslAuthenticatorFactory(ProtonSaslAuthenticatorFactory protonSaslAuthenticatorFactory) {
        this.saslAuthenticatorFactory = (ProtonSaslAuthenticatorFactory) Objects.requireNonNull(protonSaslAuthenticatorFactory);
    }

    public final void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    protected final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    public Future<Void> startInternal() {
        if (this.authorizationService == null) {
            this.authorizationService = new ClaimsBasedAuthorizationService();
        }
        return preStartServers().compose(r3 -> {
            return checkPortConfiguration();
        }).compose(r32 -> {
            return startEndpoints();
        }).compose(r33 -> {
            return startSecureServer();
        }).compose(r34 -> {
            return startInsecureServer();
        });
    }

    protected final void closeExpiredConnection(ProtonConnection protonConnection) {
        HonoUser clientPrincipal;
        if (protonConnection.isDisconnected() || (clientPrincipal = Constants.getClientPrincipal(protonConnection)) == null) {
            return;
        }
        this.LOG.debug("client's [{}] access token has expired, closing connection", clientPrincipal.getName());
        protonConnection.disconnectHandler((Handler) null);
        protonConnection.closeHandler((Handler) null);
        protonConnection.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, "access token expired"));
        protonConnection.close();
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    protected Future<Void> preStartServers() {
        return Future.succeededFuture();
    }

    private Future<Void> startEndpoints() {
        ArrayList arrayList = new ArrayList(this.endpoints.size());
        for (AmqpEndpoint amqpEndpoint : this.endpoints.values()) {
            this.LOG.info("starting endpoint [name: {}, class: {}]", amqpEndpoint.getName(), amqpEndpoint.getClass().getName());
            arrayList.add(amqpEndpoint.start());
        }
        Future<Void> future = Future.future();
        CompositeFuture.all(arrayList).setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                future.complete();
            } else {
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<Void> stopEndpoints() {
        ArrayList arrayList = new ArrayList(this.endpoints.size());
        for (AmqpEndpoint amqpEndpoint : this.endpoints.values()) {
            this.LOG.info("stopping endpoint [name: {}, class: {}]", amqpEndpoint.getName(), amqpEndpoint.getClass().getName());
            arrayList.add(amqpEndpoint.stop());
        }
        Future<Void> future = Future.future();
        CompositeFuture.all(arrayList).setHandler(asyncResult -> {
            if (asyncResult.succeeded()) {
                future.complete();
            } else {
                future.fail(asyncResult.cause());
            }
        });
        return future;
    }

    private Future<Void> startInsecureServer() {
        if (!isInsecurePortEnabled()) {
            this.LOG.info("insecure port is not enabled");
            return Future.succeededFuture();
        }
        int determineInsecurePort = determineInsecurePort();
        Future<Void> future = Future.future();
        this.insecureServer = createProtonServer(createInsecureServerOptions()).connectHandler(this::onRemoteConnectionOpenInsecurePort).listen(determineInsecurePort, ((ServiceConfigProperties) getConfig()).getInsecurePortBindAddress(), asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.LOG.error("cannot bind to insecure port", asyncResult.cause());
                future.fail(asyncResult.cause());
            } else {
                if (getInsecurePort() == getInsecurePortDefaultValue()) {
                    this.LOG.info("server listens on standard insecure port [{}:{}]", getInsecurePortBindAddress(), Integer.valueOf(getInsecurePort()));
                } else {
                    this.LOG.warn("server listens on non-standard insecure port [{}:{}], default is {}", new Object[]{getInsecurePortBindAddress(), Integer.valueOf(getInsecurePort()), Integer.valueOf(getInsecurePortDefaultValue())});
                }
                future.complete();
            }
        });
        return future;
    }

    private Future<Void> startSecureServer() {
        if (!isSecurePortEnabled()) {
            this.LOG.info("secure port is not enabled");
            return Future.succeededFuture();
        }
        int determineSecurePort = determineSecurePort();
        Future<Void> future = Future.future();
        this.server = createProtonServer(createServerOptions()).connectHandler(this::onRemoteConnectionOpen).listen(determineSecurePort, ((ServiceConfigProperties) getConfig()).getBindAddress(), asyncResult -> {
            if (!asyncResult.succeeded()) {
                this.LOG.error("cannot bind to secure port", asyncResult.cause());
                future.fail(asyncResult.cause());
            } else {
                if (getPort() == getPortDefaultValue()) {
                    this.LOG.info("server listens on standard secure port [{}:{}]", getBindAddress(), Integer.valueOf(getPort()));
                } else {
                    this.LOG.warn("server listens on non-standard secure port [{}:{}], default is {}", new Object[]{getBindAddress(), Integer.valueOf(getPort()), Integer.valueOf(getPortDefaultValue())});
                }
                future.complete();
            }
        });
        return future;
    }

    private ProtonServer createProtonServer(ProtonServerOptions protonServerOptions) {
        return ProtonServer.create(this.vertx, protonServerOptions).saslAuthenticatorFactory(this.saslAuthenticatorFactory);
    }

    protected ProtonServerOptions createServerOptions() {
        ProtonServerOptions createInsecureServerOptions = createInsecureServerOptions();
        addTlsKeyCertOptions(createInsecureServerOptions);
        addTlsTrustOptions(createInsecureServerOptions);
        return createInsecureServerOptions;
    }

    protected ProtonServerOptions createInsecureServerOptions() {
        ProtonServerOptions protonServerOptions = new ProtonServerOptions();
        protonServerOptions.setHeartbeat(60000);
        protonServerOptions.setReceiveBufferSize(16384);
        protonServerOptions.setSendBufferSize(16384);
        protonServerOptions.setLogActivity(((ServiceConfigProperties) getConfig()).isNetworkDebugLoggingEnabled());
        return protonServerOptions;
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    public final Future<Void> stopInternal() {
        return CompositeFuture.all(stopServer(), stopInsecureServer()).compose(compositeFuture -> {
            return stopEndpoints();
        });
    }

    private Future<Void> stopServer() {
        Future<Void> future = Future.future();
        if (this.server != null) {
            this.LOG.info("stopping secure AMQP server [{}:{}]", getBindAddress(), Integer.valueOf(getActualPort()));
            this.server.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    private Future<Void> stopInsecureServer() {
        Future<Void> future = Future.future();
        if (this.insecureServer != null) {
            this.LOG.info("stopping insecure AMQP server [{}:{}]", getInsecurePortBindAddress(), Integer.valueOf(getActualInsecurePort()));
            this.insecureServer.close(future.completer());
        } else {
            future.complete();
        }
        return future;
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase
    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    protected void onRemoteConnectionOpen(ProtonConnection protonConnection) {
        protonConnection.setContainer(String.format("%s-%s:%d", getServiceName(), getBindAddress(), Integer.valueOf(getPort())));
        setRemoteConnectionOpenHandler(protonConnection);
    }

    protected void onRemoteConnectionOpenInsecurePort(ProtonConnection protonConnection) {
        protonConnection.setContainer(String.format("%s-%s:%d", getServiceName(), getInsecurePortBindAddress(), Integer.valueOf(getInsecurePort())));
        setRemoteConnectionOpenHandler(protonConnection);
    }

    protected final void handleUnknownEndpoint(ProtonConnection protonConnection, ProtonLink<?> protonLink, ResourceIdentifier resourceIdentifier) {
        this.LOG.info("client [container: {}] wants to establish link for unknown endpoint [address: {}]", protonConnection.getRemoteContainer(), resourceIdentifier);
        protonLink.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND, String.format("no endpoint registered for address %s", resourceIdentifier)));
        protonLink.close();
    }

    protected final ResourceIdentifier getResourceIdentifier(String str) {
        return ((ServiceConfigProperties) getConfig()).isSingleTenant() ? ResourceIdentifier.fromStringAssumingDefaultTenant(str) : ResourceIdentifier.fromString(str);
    }

    protected void handleReceiverOpen(ProtonConnection protonConnection, ProtonReceiver protonReceiver) {
        if (protonReceiver.getRemoteTarget().getAddress() == null) {
            this.LOG.debug("client [container: {}] wants to open an anonymous link for sending messages to arbitrary addresses, closing link ...", protonConnection.getRemoteContainer());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.NOT_ALLOWED, "anonymous relay not supported"));
            protonReceiver.close();
            return;
        }
        this.LOG.debug("client [container: {}] wants to open a link [address: {}] for sending messages", protonConnection.getRemoteContainer(), protonReceiver.getRemoteTarget());
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(protonReceiver.getRemoteTarget().getAddress());
            AmqpEndpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonReceiver, resourceIdentifier);
            } else {
                HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
                getAuthorizationService().isAuthorized(clientPrincipal, resourceIdentifier, Activity.WRITE).setHandler(asyncResult -> {
                    if (!asyncResult.succeeded() || !((Boolean) asyncResult.result()).booleanValue()) {
                        this.LOG.debug("subject [{}] is not authorized to WRITE to [{}]", clientPrincipal.getName(), resourceIdentifier);
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "unauthorized"));
                        protonReceiver.close();
                    } else {
                        Constants.copyProperties(protonConnection, protonReceiver);
                        protonReceiver.setSource(protonReceiver.getRemoteSource());
                        protonReceiver.setTarget(protonReceiver.getRemoteTarget());
                        endpoint.onLinkAttach(protonConnection, protonReceiver, resourceIdentifier);
                    }
                });
            }
        } catch (IllegalArgumentException e) {
            this.LOG.debug("client has provided invalid resource identifier as target address", e);
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND, "no such address"));
            protonReceiver.close();
        }
    }

    protected void handleSenderOpen(ProtonConnection protonConnection, ProtonSender protonSender) {
        Source remoteSource = protonSender.getRemoteSource();
        this.LOG.debug("client [container: {}] wants to open a link [address: {}] for receiving messages", protonConnection.getRemoteContainer(), remoteSource);
        try {
            ResourceIdentifier resourceIdentifier = getResourceIdentifier(remoteSource.getAddress());
            AmqpEndpoint endpoint = getEndpoint(resourceIdentifier);
            if (endpoint == null) {
                handleUnknownEndpoint(protonConnection, protonSender, resourceIdentifier);
            } else {
                HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
                getAuthorizationService().isAuthorized(clientPrincipal, resourceIdentifier, Activity.READ).setHandler(asyncResult -> {
                    if (!asyncResult.succeeded() || !((Boolean) asyncResult.result()).booleanValue()) {
                        this.LOG.debug("subject [{}] is not authorized to READ from [{}]", clientPrincipal.getName(), resourceIdentifier);
                        protonSender.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS.toString(), "unauthorized"));
                        protonSender.close();
                    } else {
                        Constants.copyProperties(protonConnection, protonSender);
                        protonSender.setSource(protonSender.getRemoteSource());
                        protonSender.setTarget(protonSender.getRemoteTarget());
                        endpoint.onLinkAttach(protonConnection, protonSender, resourceIdentifier);
                    }
                });
            }
        } catch (IllegalArgumentException e) {
            this.LOG.debug("client has provided invalid resource identifier as target address", e);
            protonSender.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND, "no such address"));
            protonSender.close();
        }
    }

    protected void setRemoteConnectionOpenHandler(ProtonConnection protonConnection) {
        protonConnection.sessionOpenHandler(protonSession -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSession);
            handleSessionOpen(protonConnection, protonSession);
        });
        protonConnection.receiverOpenHandler(protonReceiver -> {
            HonoProtonHelper.setDefaultCloseHandler(protonReceiver);
            handleReceiverOpen(protonConnection, protonReceiver);
        });
        protonConnection.senderOpenHandler(protonSender -> {
            HonoProtonHelper.setDefaultCloseHandler(protonSender);
            handleSenderOpen(protonConnection, protonSender);
        });
        protonConnection.disconnectHandler(this::handleRemoteDisconnect);
        protonConnection.closeHandler(asyncResult -> {
            handleRemoteConnectionClose(protonConnection, asyncResult);
        });
        protonConnection.openHandler(asyncResult2 -> {
            if (asyncResult2.failed()) {
                this.LOG.debug("ignoring peer's open frame containing error", asyncResult2.cause());
            } else {
                processRemoteOpen((ProtonConnection) asyncResult2.result());
            }
        });
    }

    protected void processRemoteOpen(ProtonConnection protonConnection) {
        HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
        this.LOG.debug("client [container: {}, user: {}] connected", protonConnection.getRemoteContainer(), clientPrincipal.getName());
        protonConnection.attachments().set("CONNECTION_ID", String.class, UUID.randomUUID().toString());
        processDesiredCapabilities(protonConnection, protonConnection.getRemoteDesiredCapabilities());
        Duration between = Duration.between(Instant.now(), clientPrincipal.getExpirationTime());
        WeakReference weakReference = new WeakReference(protonConnection);
        this.vertx.setTimer(between.toMillis(), l -> {
            if (weakReference.get() != null) {
                closeExpiredConnection((ProtonConnection) weakReference.get());
            }
        });
        protonConnection.open();
    }

    protected void processDesiredCapabilities(ProtonConnection protonConnection, Symbol[] symbolArr) {
    }

    protected void handleSessionOpen(ProtonConnection protonConnection, ProtonSession protonSession) {
        this.LOG.debug("opening new session with client [container: {}]", protonConnection.getRemoteContainer());
        protonSession.open();
    }

    protected void publishConnectionClosedEvent(ProtonConnection protonConnection) {
        this.endpoints.values().forEach(amqpEndpoint -> {
            amqpEndpoint.onConnectionClosed(protonConnection);
        });
    }

    protected void handleRemoteConnectionClose(ProtonConnection protonConnection, AsyncResult<ProtonConnection> asyncResult) {
        if (asyncResult.succeeded()) {
            this.LOG.debug("client [container: {}] closed connection", protonConnection.getRemoteContainer());
        } else {
            this.LOG.debug("client [container: {}] closed connection with error", protonConnection.getRemoteContainer(), asyncResult.cause());
        }
        protonConnection.close();
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    protected void handleRemoteDisconnect(ProtonConnection protonConnection) {
        this.LOG.debug("client [container: {}] disconnected", protonConnection.getRemoteContainer());
        protonConnection.disconnect();
        publishConnectionClosedEvent(protonConnection);
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase, org.eclipse.hono.service.HealthCheckProvider
    public void registerReadinessChecks(HealthCheckHandler healthCheckHandler) {
        Iterator<AmqpEndpoint> it = endpoints().iterator();
        while (it.hasNext()) {
            it.next().registerReadinessChecks(healthCheckHandler);
        }
    }

    @Override // org.eclipse.hono.service.AbstractServiceBase, org.eclipse.hono.service.HealthCheckProvider
    public void registerLivenessChecks(HealthCheckHandler healthCheckHandler) {
        Iterator<AmqpEndpoint> it = endpoints().iterator();
        while (it.hasNext()) {
            it.next().registerLivenessChecks(healthCheckHandler);
        }
    }
}
