package org.eclipse.hono.adapter.mqtt;

import io.micrometer.core.instrument.Timer;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.tag.Tags;
import io.vertx.core.CompositeFuture;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.json.JsonObject;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.time.Duration;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.eclipse.hono.adapter.AbstractProtocolAdapterBase;
import org.eclipse.hono.adapter.AdapterConnectionsExceededException;
import org.eclipse.hono.adapter.AuthorizationException;
import org.eclipse.hono.adapter.ProtocolAdapterProperties;
import org.eclipse.hono.adapter.auth.device.AuthHandler;
import org.eclipse.hono.adapter.auth.device.ChainAuthHandler;
import org.eclipse.hono.adapter.auth.device.CredentialsApiAuthProvider;
import org.eclipse.hono.adapter.auth.device.DeviceCredentials;
import org.eclipse.hono.adapter.auth.device.jwt.JwtAuthProvider;
import org.eclipse.hono.adapter.auth.device.usernamepassword.UsernamePasswordAuthProvider;
import org.eclipse.hono.adapter.auth.device.x509.TenantServiceBasedX509Authentication;
import org.eclipse.hono.adapter.auth.device.x509.X509AuthProvider;
import org.eclipse.hono.adapter.limiting.ConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.DefaultConnectionLimitManager;
import org.eclipse.hono.adapter.limiting.MemoryBasedConnectionLimitStrategy;
import org.eclipse.hono.adapter.mqtt.MqttContext;
import org.eclipse.hono.adapter.mqtt.MqttProtocolAdapterProperties;
import org.eclipse.hono.adapter.mqtt.Subscription;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.notification.NotificationEventBusSupport;
import org.eclipse.hono.notification.deviceregistry.AllDevicesOfTenantDeletedNotification;
import org.eclipse.hono.notification.deviceregistry.DeviceChangeNotification;
import org.eclipse.hono.notification.deviceregistry.LifecycleChange;
import org.eclipse.hono.notification.deviceregistry.TenantChangeNotification;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.metric.MetricsTags;
import org.eclipse.hono.tracing.TenantTraceSamplingHelper;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Futures;
import org.eclipse.hono.util.Pair;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.ResourceIdentifier;
import org.eclipse.hono.util.TenantObject;

/* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.class */
public abstract class AbstractVertxBasedMqttProtocolAdapter<T extends MqttProtocolAdapterProperties> extends AbstractProtocolAdapterBase<T> {
    protected static final int MINIMAL_MEMORY_JVM = 100000000;
    protected static final int MINIMAL_MEMORY_SUBSTRATE = 35000000;
    protected static final int MEMORY_PER_CONNECTION = 20000;
    protected static final int MAX_MSG_SIZE_VARIABLE_HEADER_SIZE = 128;
    private static final String EVENT_SENDING_PUBACK = "sending PUBACK";
    private static final int IANA_MQTT_PORT = 1883;
    private static final int IANA_SECURE_MQTT_PORT = 8883;
    private static final String LOG_FIELD_TOPIC_FILTER = "filter";
    private MqttServer server;
    private MqttServer insecureServer;
    private AuthHandler<MqttConnectContext> authHandler;
    private final AtomicReference<Promise<Void>> stopResultPromiseRef = new AtomicReference<>();
    private MqttAdapterMetrics metrics = MqttAdapterMetrics.NOOP;
    private final Set<AbstractVertxBasedMqttProtocolAdapter<T>.MqttDeviceEndpoint> connectedAuthenticatedDeviceEndpoints = new HashSet();

    /* renamed from: org.eclipse.hono.adapter.mqtt.AbstractVertxBasedMqttProtocolAdapter$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType = new int[MetricsTags.EndpointType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.TELEMETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.EVENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[MetricsTags.EndpointType.COMMAND.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter$MqttDeviceEndpoint.class */
    public class MqttDeviceEndpoint {
        private final MqttEndpoint endpoint;
        private final DeviceUser authenticatedDevice;
        private final OptionalInt traceSamplingPriority;
        private final Map<Subscription.Key, Pair<CommandSubscription, ProtocolAdapterCommandConsumer>> commandSubscriptions = new ConcurrentHashMap();
        private final Map<Subscription.Key, ErrorSubscription> errorSubscriptions = new HashMap();
        private final PendingPubAcks pendingAcks;
        private Throwable protocolLevelException;

        public MqttDeviceEndpoint(MqttEndpoint mqttEndpoint, DeviceUser deviceUser, OptionalInt optionalInt) {
            this.pendingAcks = new PendingPubAcks(AbstractVertxBasedMqttProtocolAdapter.this.vertx);
            this.endpoint = (MqttEndpoint) Objects.requireNonNull(mqttEndpoint);
            this.authenticatedDevice = deviceUser;
            this.traceSamplingPriority = (OptionalInt) Objects.requireNonNull(optionalInt);
        }

        protected final DeviceUser getAuthenticatedDevice() {
            return this.authenticatedDevice;
        }

        protected final void registerHandlers() {
            this.endpoint.publishHandler(this::handlePublishedMessage);
            this.endpoint.publishAcknowledgeHandler(this::handlePubAck);
            this.endpoint.subscribeHandler(this::onSubscribe);
            this.endpoint.unsubscribeHandler(this::onUnsubscribe);
            this.endpoint.closeHandler(r3 -> {
                onClose();
            });
            this.endpoint.exceptionHandler(this::onProtocolLevelError);
        }

        private void onProtocolLevelError(Throwable th) {
            if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("protocol-level exception occurred [client ID: {}]", this.endpoint.clientIdentifier(), th);
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("protocol-level exception occurred [tenant-id: {}, device-id: {}, client ID: {}]", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), th});
            }
            this.protocolLevelException = th;
        }

        protected final void handlePublishedMessage(MqttPublishMessage mqttPublishMessage) {
            Objects.requireNonNull(mqttPublishMessage);
            Span newChildSpan = newChildSpan((SpanContext) Optional.ofNullable(mqttPublishMessage.topicName()).flatMap(str -> {
                return Optional.ofNullable(PropertyBag.fromTopic(mqttPublishMessage.topicName()));
            }).map(propertyBag -> {
                Tracer tracer = AbstractVertxBasedMqttProtocolAdapter.this.tracer;
                Objects.requireNonNull(propertyBag);
                return TracingHelper.extractSpanContext(tracer, propertyBag::getPropertiesIterator);
            }).orElse(null), "PUBLISH");
            newChildSpan.setTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), mqttPublishMessage.topicName());
            newChildSpan.setTag(TracingHelper.TAG_QOS.getKey(), mqttPublishMessage.qosLevel().toString());
            this.traceSamplingPriority.ifPresent(i -> {
                TracingHelper.setTraceSamplingPriority(newChildSpan, i);
            });
            MqttContext fromPublishPacket = MqttContext.fromPublishPacket(mqttPublishMessage, this.endpoint, newChildSpan, this.authenticatedDevice);
            fromPublishPacket.setTimer(AbstractVertxBasedMqttProtocolAdapter.this.getMetrics().startTimer());
            (this.authenticatedDevice == null ? applyTraceSamplingPriorityForTopicTenant(fromPublishPacket.topic(), newChildSpan) : Future.succeededFuture()).compose(r5 -> {
                return checkTopic(fromPublishPacket);
            }).compose(r52 -> {
                return checkExpiration(fromPublishPacket);
            }).compose(r53 -> {
                return AbstractVertxBasedMqttProtocolAdapter.this.onPublishedMessage(fromPublishPacket);
            }).onSuccess(r7 -> {
                Tags.HTTP_STATUS.set(newChildSpan, 202);
                AbstractVertxBasedMqttProtocolAdapter.this.onMessageSent(fromPublishPacket);
                newChildSpan.finish();
            }).onFailure(th -> {
                handlePublishedMessageError(fromPublishPacket, th, newChildSpan);
            });
        }

        private void handlePublishedMessageError(MqttContext mqttContext, Throwable th, Span span) {
            ErrorSubscription errorSubscription = getErrorSubscription(mqttContext);
            Future future = (Future) Optional.ofNullable(errorSubscription).map(errorSubscription2 -> {
                return publishError(errorSubscription, mqttContext, th, span.context());
            }).orElseGet(Future::succeededFuture);
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(span, th);
            if (!(th instanceof ClientErrorException)) {
                AbstractVertxBasedMqttProtocolAdapter.this.onMessageUndeliverable(mqttContext);
            }
            future.compose(r10 -> {
                return AbstractVertxBasedMqttProtocolAdapter.this.isTerminalError(th, mqttContext.deviceId(), this.authenticatedDevice, span.context());
            }).onComplete(asyncResult -> {
                boolean booleanValue = asyncResult.succeeded() ? ((Boolean) asyncResult.result()).booleanValue() : false;
                MqttContext.ErrorHandlingMode errorHandlingMode = mqttContext.getErrorHandlingMode(errorSubscription != null);
                if (errorHandlingMode == MqttContext.ErrorHandlingMode.DISCONNECT || booleanValue) {
                    if (mqttContext.deviceEndpoint().isConnected()) {
                        span.log("closing connection to device");
                        mqttContext.deviceEndpoint().close();
                    }
                } else if (mqttContext.isAtLeastOnce()) {
                    if (errorHandlingMode == MqttContext.ErrorHandlingMode.SKIP_ACK) {
                        span.log("skipped sending PUBACK");
                    } else if (mqttContext.deviceEndpoint().isConnected()) {
                        span.log(AbstractVertxBasedMqttProtocolAdapter.EVENT_SENDING_PUBACK);
                        mqttContext.acknowledge();
                    }
                }
                span.finish();
            });
        }

        protected final Future<Void> applyTraceSamplingPriorityForTopicTenant(ResourceIdentifier resourceIdentifier, Span span) {
            Objects.requireNonNull(span);
            return (resourceIdentifier == null || resourceIdentifier.getTenantId() == null) ? Future.succeededFuture() : AbstractVertxBasedMqttProtocolAdapter.this.getTenantConfiguration(resourceIdentifier.getTenantId(), span.context()).map(tenantObject -> {
                TracingHelper.setDeviceTags(span, tenantObject.getTenantId(), (String) null);
                TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, (String) null, span);
                return (Void) null;
            }).recover(th -> {
                return Future.succeededFuture();
            });
        }

        private Future<Void> checkTopic(MqttContext mqttContext) {
            return mqttContext.topic() == null ? Future.failedFuture(new ClientErrorException(400, "malformed topic name")) : Future.succeededFuture();
        }

        private boolean disconnectOnExpired() {
            if (this.authenticatedDevice == null || !this.authenticatedDevice.expired()) {
                return false;
            }
            this.endpoint.close();
            return true;
        }

        private Future<Void> checkExpiration(MqttContext mqttContext) {
            return (mqttContext.authenticatedDevice() == null || !mqttContext.authenticatedDevice().expired()) ? Future.succeededFuture() : Future.failedFuture(new MqttConnectionException(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED));
        }

        private ErrorSubscription getErrorSubscription(MqttContext mqttContext) {
            ErrorSubscription errorSubscription = null;
            if (mqttContext.tenant() != null && mqttContext.deviceId() != null) {
                errorSubscription = this.errorSubscriptions.get(ErrorSubscription.getKey(mqttContext.tenant(), mqttContext.deviceId()));
            }
            if (errorSubscription == null && mqttContext.authenticatedDevice() != null) {
                errorSubscription = this.errorSubscriptions.get(ErrorSubscription.getKey(mqttContext.tenant(), mqttContext.authenticatedDevice().getDeviceId()));
            }
            return errorSubscription;
        }

        public final void handlePubAck(Integer num) {
            Objects.requireNonNull(num);
            if (disconnectOnExpired()) {
                return;
            }
            this.pendingAcks.handlePubAck(num);
        }

        protected final void onSubscribe(MqttSubscribeMessage mqttSubscribeMessage) {
            Objects.requireNonNull(mqttSubscribeMessage);
            if (disconnectOnExpired()) {
                return;
            }
            HashMap hashMap = new HashMap();
            ArrayDeque arrayDeque = new ArrayDeque(mqttSubscribeMessage.topicSubscriptions().size());
            Span newSpan = newSpan("SUBSCRIBE");
            new ArrayDeque(mqttSubscribeMessage.topicSubscriptions()).descendingIterator().forEachRemaining(mqttTopicSubscription -> {
                Future<Subscription> registerSubscription;
                Subscription fromTopic = CommandSubscription.hasCommandEndpointPrefix(mqttTopicSubscription.topicName()) ? CommandSubscription.fromTopic(mqttTopicSubscription, this.authenticatedDevice) : ErrorSubscription.fromTopic(mqttTopicSubscription, this.authenticatedDevice);
                if (fromTopic == null) {
                    TracingHelper.logError(newSpan, String.format("unsupported topic filter [%s]", mqttTopicSubscription.topicName()));
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [filter: {}, requested QoS: {}]: unsupported topic filter", mqttTopicSubscription.topicName(), mqttTopicSubscription.qualityOfService());
                    registerSubscription = Future.failedFuture(new IllegalArgumentException("unsupported topic filter"));
                } else if (hashMap.containsKey(fromTopic.getKey())) {
                    HashMap hashMap2 = new HashMap(3);
                    hashMap2.put("event", "ignoring duplicate subscription");
                    hashMap2.put(AbstractVertxBasedMqttProtocolAdapter.LOG_FIELD_TOPIC_FILTER, fromTopic.getTopic());
                    hashMap2.put("requested QoS", fromTopic.getQos());
                    newSpan.log(hashMap2);
                    registerSubscription = (Future) hashMap.get(fromTopic.getKey());
                } else {
                    registerSubscription = registerSubscription(fromTopic, newSpan);
                    hashMap.put(fromTopic.getKey(), registerSubscription);
                }
                arrayDeque.addFirst(registerSubscription);
            });
            CompositeFuture.join(new ArrayList(arrayDeque)).onComplete(asyncResult -> {
                if (this.endpoint.isConnected()) {
                    this.endpoint.subscribeAcknowledge(mqttSubscribeMessage.messageId(), (List) arrayDeque.stream().map(future -> {
                        return future.failed() ? MqttQoS.FAILURE : ((Subscription) future.result()).getQos();
                    }).collect(Collectors.toList()));
                } else {
                    TracingHelper.logError(newSpan, "skipped sending command subscription notification - endpoint not connected anymore");
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("skipped sending command subscription notification - endpoint not connected anymore [tenant-id: {}, device-id: {}]", Optional.ofNullable(this.authenticatedDevice).map((v0) -> {
                        return v0.getTenantId();
                    }).orElse(""), Optional.ofNullable(this.authenticatedDevice).map((v0) -> {
                        return v0.getDeviceId();
                    }).orElse(""));
                }
                newSpan.finish();
            });
        }

        private Future<Subscription> registerSubscription(Subscription subscription, Span span) {
            return subscription instanceof CommandSubscription ? registerCommandSubscription((CommandSubscription) subscription, span) : registerErrorSubscription((ErrorSubscription) subscription, span);
        }

        private Future<Subscription> registerCommandSubscription(CommandSubscription commandSubscription, Span span) {
            if (!MqttQoS.EXACTLY_ONCE.equals(commandSubscription.getQos())) {
                return createCommandConsumer(commandSubscription, span).map(protocolAdapterCommandConsumer -> {
                    commandSubscription.logSubscribeSuccess(span);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("created subscription [tenant: {}, device: {}, filter: {}, QoS: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getTopic(), commandSubscription.getQos()});
                    Pair<CommandSubscription, ProtocolAdapterCommandConsumer> pair = this.commandSubscriptions.get(commandSubscription.getKey());
                    if (pair != null) {
                        span.log(String.format("subscription replaces previous subscription [QoS %s, filter %s]", ((CommandSubscription) pair.one()).getQos(), ((CommandSubscription) pair.one()).getTopic()));
                        AbstractVertxBasedMqttProtocolAdapter.this.log.debug("previous subscription [QoS {}, filter {}] is getting replaced", ((CommandSubscription) pair.one()).getQos(), ((CommandSubscription) pair.one()).getTopic());
                    }
                    this.commandSubscriptions.put(commandSubscription.getKey(), Pair.of(commandSubscription, protocolAdapterCommandConsumer));
                    return commandSubscription;
                }).recover(th -> {
                    commandSubscription.logSubscribeFailure(span, th);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getTopic(), commandSubscription.getQos(), th});
                    return Future.failedFuture(th);
                });
            }
            TracingHelper.logError(span, String.format("topic filter [%s] with unsupported QoS 2", commandSubscription.getTopic()));
            return Future.failedFuture(new IllegalArgumentException("QoS 2 not supported for command subscription"));
        }

        private Future<Subscription> registerErrorSubscription(ErrorSubscription errorSubscription, Span span) {
            if (MqttQoS.AT_MOST_ONCE.equals(errorSubscription.getQos())) {
                return Future.succeededFuture().compose(obj -> {
                    return errorSubscription.isGatewaySubscriptionForSpecificDevice() ? AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), errorSubscription.getDeviceId(), this.authenticatedDevice, span.context()) : Future.succeededFuture();
                }).recover(th -> {
                    errorSubscription.logSubscribeFailure(span, th);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot create subscription [tenant: {}, device: {}, filter: {}, requested QoS: {}]", new Object[]{errorSubscription.getTenant(), errorSubscription.getDeviceId(), errorSubscription.getTopic(), errorSubscription.getQos(), th});
                    return Future.failedFuture(th);
                }).compose(registrationAssertion -> {
                    this.errorSubscriptions.put(errorSubscription.getKey(), errorSubscription);
                    errorSubscription.logSubscribeSuccess(span);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("created subscription [tenant: {}, device: {}, filter: {}, QoS: {}]", new Object[]{errorSubscription.getTenant(), errorSubscription.getDeviceId(), errorSubscription.getTopic(), errorSubscription.getQos()});
                    return Future.succeededFuture(errorSubscription);
                });
            }
            TracingHelper.logError(span, String.format("topic filter [%s] with unsupported QoS %d", errorSubscription.getTopic(), Integer.valueOf(errorSubscription.getQos().value())));
            return Future.failedFuture(new IllegalArgumentException(String.format("QoS %d not supported for error subscription", Integer.valueOf(errorSubscription.getQos().value()))));
        }

        protected final Future<Void> publishError(ErrorSubscription errorSubscription, MqttContext mqttContext, Throwable th, SpanContext spanContext) {
            Objects.requireNonNull(errorSubscription);
            Objects.requireNonNull(mqttContext);
            Objects.requireNonNull(th);
            Span newChildSpan = newChildSpan(spanContext, "publish error to device");
            int extractStatusCode = ServiceInvocationException.extractStatusCode(th);
            String str = (String) Optional.ofNullable(mqttContext.correlationId()).orElse("-1");
            String errorPublishTopic = errorSubscription.getErrorPublishTopic(mqttContext, extractStatusCode);
            Tags.MESSAGE_BUS_DESTINATION.set(newChildSpan, errorPublishTopic);
            TracingHelper.TAG_QOS.set(newChildSpan, errorSubscription.getQos().name());
            JsonObject jsonObject = new JsonObject();
            jsonObject.put("code", Integer.valueOf(extractStatusCode));
            jsonObject.put("message", ServiceInvocationException.getErrorMessageForExternalClient(th));
            jsonObject.put("timestamp", ZonedDateTime.now(ZoneOffset.UTC).truncatedTo(ChronoUnit.MILLIS).format(DateTimeFormatter.ISO_INSTANT));
            jsonObject.put("correlation-id", str);
            String format = (this.authenticatedDevice == null || this.authenticatedDevice.getDeviceId().equals(mqttContext.deviceId())) ? String.format("device [%s]", mqttContext.deviceId()) : String.format("gateway [%s], device [%s]", this.authenticatedDevice.getDeviceId(), mqttContext.deviceId());
            return publish(errorPublishTopic, jsonObject.toBuffer(), errorSubscription.getQos()).onSuccess(num -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("published error message [packet-id: {}] to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{num, format, errorSubscription.getTenant(), this.endpoint.clientIdentifier(), errorSubscription.getQos(), errorPublishTopic});
                newChildSpan.log(errorSubscription.getQos().value() > 0 ? "published error message, packet-id: " + num : "published error message");
            }).onFailure(th2 -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error publishing error message to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{format, errorSubscription.getTenant(), this.endpoint.clientIdentifier(), errorSubscription.getQos(), errorPublishTopic, th2});
                TracingHelper.logError(newChildSpan, "failed to publish error message", th2);
            }).map(num2 -> {
                return (Void) null;
            }).onComplete(asyncResult -> {
                newChildSpan.finish();
            });
        }

        private Future<ProtocolAdapterCommandConsumer> createCommandConsumer(CommandSubscription commandSubscription, Span span) {
            Function function = commandContext -> {
                Tags.COMPONENT.set(commandContext.getTracingSpan(), AbstractVertxBasedMqttProtocolAdapter.this.getTypeName());
                TracingHelper.TAG_CLIENT_ID.set(commandContext.getTracingSpan(), this.endpoint.clientIdentifier());
                Timer.Sample startTimer = AbstractVertxBasedMqttProtocolAdapter.this.metrics.startTimer();
                AbstractVertxBasedMqttProtocolAdapter.addMicrometerSample(commandContext, startTimer);
                Command command = commandContext.getCommand();
                Future tenantConfiguration = AbstractVertxBasedMqttProtocolAdapter.this.getTenantConfiguration(commandSubscription.getTenant(), commandContext.getTracingContext());
                return tenantConfiguration.compose(tenantObject -> {
                    return !command.isValid() ? Future.failedFuture(new ClientErrorException(400, "malformed command message")) : AbstractVertxBasedMqttProtocolAdapter.this.checkMessageLimit(tenantObject, command.getPayloadSize(), commandContext.getTracingContext());
                }).compose(r11 -> {
                    if (commandSubscription.isGatewaySubscriptionForSpecificDevice()) {
                        return AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), commandSubscription.getDeviceId(), this.authenticatedDevice, commandContext.getTracingContext());
                    }
                    if (commandSubscription.isGatewaySubscriptionForAllDevices() || !command.isTargetedAtGateway()) {
                        return Future.succeededFuture();
                    }
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("Mapped target gateway not subscribed to commands for devices behind that gateway [tenant: {}, gateway-id: {}, device-id: {}]", new Object[]{command.getTenant(), command.getGatewayId(), command.getDeviceId()});
                    return Future.failedFuture(new NoConsumerException("Mapped target gateway not subscribed to commands for devices behind that gateway"));
                }).onFailure(th -> {
                    if (th instanceof ClientErrorException) {
                        commandContext.reject(th);
                    } else {
                        commandContext.release(th);
                    }
                    AbstractVertxBasedMqttProtocolAdapter.this.metrics.reportCommand(command.isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandSubscription.getTenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), command.getPayloadSize(), startTimer);
                }).compose(registrationAssertion -> {
                    return onCommandReceived((TenantObject) tenantConfiguration.result(), commandSubscription, commandContext);
                });
            };
            Future future = (Future) Optional.ofNullable(this.authenticatedDevice).map(deviceUser -> {
                return AbstractVertxBasedMqttProtocolAdapter.this.getRegistrationAssertion(this.authenticatedDevice.getTenantId(), commandSubscription.getDeviceId(), this.authenticatedDevice, span.context());
            }).orElseGet(Future::succeededFuture);
            return commandSubscription.isGatewaySubscriptionForSpecificDevice() ? future.compose(registrationAssertion -> {
                return AbstractVertxBasedMqttProtocolAdapter.this.getCommandConsumerFactory().createCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId(), commandSubscription.getAuthenticatedDeviceId(), true, function, (Duration) null, span.context());
            }) : future.compose(registrationAssertion2 -> {
                return AbstractVertxBasedMqttProtocolAdapter.this.getCommandConsumerFactory().createCommandConsumer(commandSubscription.getTenant(), commandSubscription.getDeviceId(), true, function, (Duration) null, span.context());
            });
        }

        protected final Future<Void> onCommandReceived(TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext) {
            Objects.requireNonNull(tenantObject);
            Objects.requireNonNull(commandSubscription);
            Objects.requireNonNull(commandContext);
            Command command = commandContext.getCommand();
            String commandPublishTopic = commandSubscription.getCommandPublishTopic(command);
            Tags.MESSAGE_BUS_DESTINATION.set(commandContext.getTracingSpan(), commandPublishTopic);
            TracingHelper.TAG_QOS.set(commandContext.getTracingSpan(), commandSubscription.getQos().name());
            String format = command.isTargetedAtGateway() ? String.format("gateway [%s], device [%s]", command.getGatewayId(), command.getDeviceId()) : String.format("device [%s]", command.getDeviceId());
            return AbstractVertxBasedMqttProtocolAdapter.this.getCommandPayload(commandContext).map(buffer -> {
                return (Buffer) Optional.ofNullable(buffer).orElseGet(Buffer::buffer);
            }).onSuccess(buffer2 -> {
                publish(commandPublishTopic, buffer2, commandSubscription.getQos()).onSuccess(num -> {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("published command [packet-id: {}] to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{num, format, commandSubscription.getTenant(), this.endpoint.clientIdentifier(), commandSubscription.getQos(), commandPublishTopic});
                    commandContext.getTracingSpan().log(commandSubscription.getQos().value() > 0 ? "published command, packet-id: " + num : "published command");
                    afterCommandPublished(num, commandContext, tenantObject, commandSubscription);
                }).onFailure(th -> {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error publishing command to {} [tenant-id: {}, MQTT client-id: {}, QoS: {}, topic: {}]", new Object[]{format, commandSubscription.getTenant(), this.endpoint.clientIdentifier(), commandSubscription.getQos(), commandPublishTopic, th});
                    TracingHelper.logError(commandContext.getTracingSpan(), "failed to publish command", th);
                    reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.from(th));
                    commandContext.release(th);
                });
            }).onFailure(th -> {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("error mapping command [tenant-id: {}, MQTT client-id: {}, QoS: {}]", new Object[]{commandSubscription.getTenant(), this.endpoint.clientIdentifier(), commandSubscription.getQos(), th});
                TracingHelper.logError(commandContext.getTracingSpan(), "failed to map command", th);
                reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.from(th));
                commandContext.release(th);
            }).mapEmpty();
        }

        private Future<Integer> publish(String str, Buffer buffer, MqttQoS mqttQoS) {
            Promise promise = Promise.promise();
            try {
                this.endpoint.publish(str, buffer, mqttQoS, false, false, promise);
            } catch (Exception e) {
                promise.fail(!this.endpoint.isConnected() ? new ServerErrorException(503, "connection to device already closed") : new ServerErrorException(500, e));
            }
            return promise.future();
        }

        private void afterCommandPublished(Integer num, CommandContext commandContext, TenantObject tenantObject, CommandSubscription commandSubscription) {
            if (!MqttQoS.AT_LEAST_ONCE.equals(commandSubscription.getQos())) {
                reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
                commandContext.accept();
            } else {
                this.pendingAcks.add(num, num2 -> {
                    reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.FORWARDED);
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{num2, commandSubscription.getTenant(), commandSubscription.getDeviceId(), this.endpoint.clientIdentifier()});
                    commandContext.getTracingSpan().log("received PUBACK from device");
                    commandContext.accept();
                }, r12 -> {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("did not receive PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", new Object[]{num, commandSubscription.getTenant(), commandSubscription.getDeviceId(), this.endpoint.clientIdentifier()});
                    commandContext.release(new ServerErrorException(503, "did not receive PUBACK from device"));
                    reportPublishedCommand(tenantObject, commandSubscription, commandContext, MetricsTags.ProcessingOutcome.UNDELIVERABLE);
                }, ((MqttProtocolAdapterProperties) AbstractVertxBasedMqttProtocolAdapter.this.getConfig()).getSendMessageToDeviceTimeout());
            }
        }

        private void reportPublishedCommand(TenantObject tenantObject, CommandSubscription commandSubscription, CommandContext commandContext, MetricsTags.ProcessingOutcome processingOutcome) {
            AbstractVertxBasedMqttProtocolAdapter.this.metrics.reportCommand(commandContext.getCommand().isOneWay() ? MetricsTags.Direction.ONE_WAY : MetricsTags.Direction.REQUEST, commandSubscription.getTenant(), tenantObject, processingOutcome, commandContext.getCommand().getPayloadSize(), AbstractVertxBasedMqttProtocolAdapter.getMicrometerSample(commandContext));
        }

        protected final void onUnsubscribe(MqttUnsubscribeMessage mqttUnsubscribeMessage) {
            Objects.requireNonNull(mqttUnsubscribeMessage);
            if (disconnectOnExpired()) {
                return;
            }
            Span newSpan = newSpan("UNSUBSCRIBE");
            ArrayList arrayList = new ArrayList(mqttUnsubscribeMessage.topics().size());
            mqttUnsubscribeMessage.topics().forEach(str -> {
                AtomicReference atomicReference = new AtomicReference();
                if (CommandSubscription.hasCommandEndpointPrefix(str)) {
                    Optional ofNullable = Optional.ofNullable(CommandSubscription.getKey(str, this.authenticatedDevice));
                    Map<Subscription.Key, Pair<CommandSubscription, ProtocolAdapterCommandConsumer>> map = this.commandSubscriptions;
                    Objects.requireNonNull(map);
                    ofNullable.map((v1) -> {
                        return r1.remove(v1);
                    }).ifPresent(pair -> {
                        CommandSubscription commandSubscription = (CommandSubscription) pair.one();
                        atomicReference.set(commandSubscription);
                        commandSubscription.logUnsubscribe(newSpan);
                        arrayList.add(closeCommandConsumer(pair, newSpan, true));
                    });
                } else if (ErrorSubscription.hasErrorEndpointPrefix(str)) {
                    Optional ofNullable2 = Optional.ofNullable(ErrorSubscription.getKey(str, (Device) this.authenticatedDevice));
                    Map<Subscription.Key, ErrorSubscription> map2 = this.errorSubscriptions;
                    Objects.requireNonNull(map2);
                    ofNullable2.map((v1) -> {
                        return r1.remove(v1);
                    }).ifPresent(errorSubscription -> {
                        atomicReference.set(errorSubscription);
                        errorSubscription.logUnsubscribe(newSpan);
                    });
                }
                if (atomicReference.get() != null) {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("removed subscription with topic [{}] for device [tenant-id: {}, device-id: {}]", new Object[]{str, ((Subscription) atomicReference.get()).getTenant(), ((Subscription) atomicReference.get()).getDeviceId()});
                } else {
                    TracingHelper.logError(newSpan, String.format("no subscription found for topic filter [%s]", str));
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("cannot unsubscribe - no subscription found for topic filter [{}]", str);
                }
            });
            if (this.endpoint.isConnected()) {
                this.endpoint.unsubscribeAcknowledge(mqttUnsubscribeMessage.messageId());
            }
            CompositeFuture.join(arrayList).onComplete(asyncResult -> {
                newSpan.finish();
            });
        }

        private Future<Void> closeCommandConsumer(Pair<CommandSubscription, ProtocolAdapterCommandConsumer> pair, Span span, boolean z) {
            CommandSubscription commandSubscription = (CommandSubscription) pair.one();
            return ((ProtocolAdapterCommandConsumer) pair.two()).close(z, span.context()).recover(th -> {
                TracingHelper.logError(span, th);
                if (ServiceInvocationException.extractStatusCode(th) != 412) {
                    return Future.succeededFuture();
                }
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("command consumer wasn't active anymore [tenant: {}, device-id: {}]", commandSubscription.getTenant(), commandSubscription.getDeviceId());
                span.log("command consumer wasn't active anymore");
                return Future.failedFuture(th);
            });
        }

        private boolean stopCalled() {
            return AbstractVertxBasedMqttProtocolAdapter.this.stopResultPromiseRef.get() != null;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public final void close(String str, boolean z) {
            Objects.requireNonNull(str);
            Span newSpan = newSpan("close device connection");
            if (this.endpoint.isConnected()) {
                this.endpoint.closeHandler(r1 -> {
                });
            }
            onCloseInternal(newSpan, str, z, true).onComplete(asyncResult -> {
                newSpan.finish();
            });
        }

        protected final void onClose() {
            Span newSpan;
            String str;
            boolean z;
            if (this.protocolLevelException != null) {
                newSpan = newSpan("close connection due to protocol error");
                TracingHelper.logError(newSpan, (String) null, this.protocolLevelException, true);
                str = "protocol error: " + this.protocolLevelException.toString();
                z = true;
            } else if (stopCalled()) {
                newSpan = newSpan("close device connection (server shutdown)");
                str = "server shutdown";
                z = false;
            } else {
                newSpan = newSpan("CLOSE");
                str = null;
                z = true;
            }
            Span span = newSpan;
            onCloseInternal(newSpan, str, true, z).onComplete(asyncResult -> {
                span.finish();
            });
        }

        private Future<Void> onCloseInternal(Span span, String str, boolean z, boolean z2) {
            AbstractVertxBasedMqttProtocolAdapter.this.onBeforeEndpointClose(this);
            AbstractVertxBasedMqttProtocolAdapter.this.onClose(this.endpoint);
            CompositeFuture removeAllCommandSubscriptions = removeAllCommandSubscriptions(span, z, z2);
            if (z) {
                AbstractVertxBasedMqttProtocolAdapter.this.sendDisconnectedEvent(this.endpoint.clientIdentifier(), this.authenticatedDevice, span.context());
            }
            if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.metrics.decrementUnauthenticatedConnections();
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.metrics.decrementConnections(this.authenticatedDevice.getTenantId());
            }
            String str2 = str != null ? "; reason: " + str : "";
            if (this.endpoint.isConnected()) {
                HashMap hashMap = new HashMap(2);
                hashMap.put("event", "closing device connection");
                Optional.ofNullable(str).ifPresent(str3 -> {
                    hashMap.put("reason", str3);
                });
                span.log(hashMap);
                if (this.authenticatedDevice == null) {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("closing connection to anonymous device [client ID: {}]{}", this.endpoint.clientIdentifier(), str2);
                } else {
                    AbstractVertxBasedMqttProtocolAdapter.this.log.debug("closing connection to device [tenant-id: {}, device-id: {}, client ID: {}]{}", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), str2});
                }
                this.endpoint.close();
            } else if (this.authenticatedDevice == null) {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("connection to anonymous device closed [client ID: {}]{}", this.endpoint.clientIdentifier(), str2);
            } else {
                AbstractVertxBasedMqttProtocolAdapter.this.log.debug("connection to device closed [tenant-id: {}, device-id: {}, client ID: {}]{}", new Object[]{this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId(), this.endpoint.clientIdentifier(), str2});
            }
            return removeAllCommandSubscriptions.mapEmpty();
        }

        private CompositeFuture removeAllCommandSubscriptions(Span span, boolean z, boolean z2) {
            ArrayList arrayList = new ArrayList(this.commandSubscriptions.size());
            Iterator<Pair<CommandSubscription, ProtocolAdapterCommandConsumer>> it = this.commandSubscriptions.values().iterator();
            while (it.hasNext()) {
                Pair<CommandSubscription, ProtocolAdapterCommandConsumer> next = it.next();
                ((CommandSubscription) next.one()).logUnsubscribe(span);
                if (z2) {
                    arrayList.add(closeCommandConsumer(next, span, z));
                } else {
                    arrayList.add(Future.succeededFuture());
                }
                it.remove();
            }
            return CompositeFuture.join(arrayList);
        }

        private Span newSpan(String str) {
            Span newChildSpan = newChildSpan(null, str);
            this.traceSamplingPriority.ifPresent(i -> {
                TracingHelper.setTraceSamplingPriority(newChildSpan, i);
            });
            return newChildSpan;
        }

        private Span newChildSpan(SpanContext spanContext, String str) {
            Span start = TracingHelper.buildChildSpan(AbstractVertxBasedMqttProtocolAdapter.this.tracer, spanContext, str, AbstractVertxBasedMqttProtocolAdapter.this.getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "server").withTag(TracingHelper.TAG_CLIENT_ID.getKey(), this.endpoint.clientIdentifier()).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), this.authenticatedDevice != null).start();
            if (this.authenticatedDevice != null) {
                TracingHelper.setDeviceTags(start, this.authenticatedDevice.getTenantId(), this.authenticatedDevice.getDeviceId());
            }
            return start;
        }
    }

    public final void setAuthHandler(AuthHandler<MqttConnectContext> authHandler) {
        this.authHandler = (AuthHandler) Objects.requireNonNull(authHandler);
    }

    public int getPortDefaultValue() {
        return IANA_SECURE_MQTT_PORT;
    }

    public int getInsecurePortDefaultValue() {
        return IANA_MQTT_PORT;
    }

    protected final int getActualPort() {
        if (this.server != null) {
            return this.server.actualPort();
        }
        return -1;
    }

    protected final int getActualInsecurePort() {
        if (this.insecureServer != null) {
            return this.insecureServer.actualPort();
        }
        return -1;
    }

    public final void setMetrics(MqttAdapterMetrics mqttAdapterMetrics) {
        Optional.ofNullable(mqttAdapterMetrics).ifPresent(mqttAdapterMetrics2 -> {
            this.log.info("reporting metrics using [{}]", mqttAdapterMetrics.getClass().getName());
        });
        this.metrics = mqttAdapterMetrics;
    }

    protected final MqttAdapterMetrics getMetrics() {
        return this.metrics;
    }

    protected AuthHandler<MqttConnectContext> createAuthHandler() {
        return new ChainAuthHandler(this::handleBeforeCredentialsValidation).append(new X509AuthHandler(new TenantServiceBasedX509Authentication(getTenantClient(), this.tracer), new X509AuthProvider(getCredentialsClient(), this.tracer))).append(new JwtAuthHandler(new JwtAuthProvider(getCredentialsClient(), this.tracer))).append(new ConnectPacketAuthHandler(new UsernamePasswordAuthProvider(getCredentialsClient(), this.tracer)));
    }

    protected Future<Void> handleBeforeCredentialsValidation(DeviceCredentials deviceCredentials, MqttConnectContext mqttConnectContext) {
        String tenantId = deviceCredentials.getTenantId();
        Span tracingSpan = mqttConnectContext.getTracingSpan();
        String authId = deviceCredentials.getAuthId();
        return getTenantConfiguration(tenantId, tracingSpan.context()).recover(th -> {
            return Future.failedFuture(CredentialsApiAuthProvider.mapNotFoundToBadCredentialsException(th));
        }).map(tenantObject -> {
            TracingHelper.setDeviceTags(tracingSpan, tenantId, (String) null, authId);
            mqttConnectContext.setTraceSamplingPriority(TenantTraceSamplingHelper.applyTraceSamplingPriority(tenantObject, authId, tracingSpan));
            return tenantObject;
        }).compose(this::isAdapterEnabled).mapEmpty();
    }

    public void setMqttSecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalStateException("MQTT server must not be started already");
        }
        this.server = mqttServer;
    }

    public void setMqttInsecureServer(MqttServer mqttServer) {
        Objects.requireNonNull(mqttServer);
        if (mqttServer.actualPort() > 0) {
            throw new IllegalStateException("MQTT server must not be started already");
        }
        this.insecureServer = mqttServer;
    }

    private Future<Void> bindSecureMqttServer() {
        if (!isSecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((MqttProtocolAdapterProperties) getConfig()).getBindAddress()).setPort(determineSecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize() + MAX_MSG_SIZE_VARIABLE_HEADER_SIZE);
        addTlsKeyCertOptions(mqttServerOptions);
        addTlsTrustOptions(mqttServerOptions);
        return bindMqttServer(mqttServerOptions, this.server).map(mqttServer -> {
            this.server = mqttServer;
            return (Void) null;
        });
    }

    private Future<Void> bindInsecureMqttServer() {
        if (!isInsecurePortEnabled()) {
            return Future.succeededFuture();
        }
        MqttServerOptions mqttServerOptions = new MqttServerOptions();
        mqttServerOptions.setHost(((MqttProtocolAdapterProperties) getConfig()).getInsecurePortBindAddress()).setPort(determineInsecurePort()).setMaxMessageSize(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize() + MAX_MSG_SIZE_VARIABLE_HEADER_SIZE);
        return bindMqttServer(mqttServerOptions, this.insecureServer).map(mqttServer -> {
            this.insecureServer = mqttServer;
            return (Void) null;
        });
    }

    private Future<MqttServer> bindMqttServer(MqttServerOptions mqttServerOptions, MqttServer mqttServer) {
        Promise promise = Promise.promise();
        MqttServer create = mqttServer == null ? MqttServer.create(this.vertx, mqttServerOptions) : mqttServer;
        create.endpointHandler(this::handleEndpointConnection).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                this.log.info("MQTT server running on {}:{}", ((MqttProtocolAdapterProperties) getConfig()).getBindAddress(), Integer.valueOf(create.actualPort()));
                promise.complete(create);
            } else {
                this.log.error("error while starting up MQTT server", asyncResult.cause());
                promise.fail(asyncResult.cause());
            }
        });
        return promise.future();
    }

    protected final void doStart(Promise<Void> promise) {
        registerDeviceAndTenantChangeNotificationConsumers();
        this.log.info("limiting size of inbound message payload to {} bytes", Integer.valueOf(((MqttProtocolAdapterProperties) getConfig()).getMaxPayloadSize()));
        if (!((MqttProtocolAdapterProperties) getConfig()).isAuthenticationRequired()) {
            this.log.warn("authentication of devices turned off");
        }
        setConnectionLimitManager((ConnectionLimitManager) Optional.ofNullable(getConnectionLimitManager()).orElseGet(this::createConnectionLimitManager));
        checkPortConfiguration().compose(r4 -> {
            return CompositeFuture.all(bindSecureMqttServer(), bindInsecureMqttServer());
        }).compose(compositeFuture -> {
            if (this.authHandler == null) {
                this.authHandler = createAuthHandler();
            }
            return Future.succeededFuture((Void) null);
        }).onComplete(promise);
    }

    private void registerDeviceAndTenantChangeNotificationConsumers() {
        NotificationEventBusSupport.registerConsumer(this.vertx, DeviceChangeNotification.TYPE, deviceChangeNotification -> {
            if (LifecycleChange.DELETE.equals(deviceChangeNotification.getChange()) || (LifecycleChange.UPDATE.equals(deviceChangeNotification.getChange()) && !deviceChangeNotification.isDeviceEnabled())) {
                closeDeviceConnectionsOnDeviceOrTenantChange(device -> {
                    return device.getTenantId().equals(deviceChangeNotification.getTenantId()) && device.getDeviceId().equals(deviceChangeNotification.getDeviceId());
                }, LifecycleChange.DELETE.equals(deviceChangeNotification.getChange()) ? "device deleted" : "device disabled");
            }
        });
        NotificationEventBusSupport.registerConsumer(this.vertx, AllDevicesOfTenantDeletedNotification.TYPE, allDevicesOfTenantDeletedNotification -> {
            closeDeviceConnectionsOnDeviceOrTenantChange(device -> {
                return device.getTenantId().equals(allDevicesOfTenantDeletedNotification.getTenantId());
            }, "all devices of tenant deleted");
        });
        NotificationEventBusSupport.registerConsumer(this.vertx, TenantChangeNotification.TYPE, tenantChangeNotification -> {
            if (LifecycleChange.DELETE.equals(tenantChangeNotification.getChange()) || (LifecycleChange.UPDATE.equals(tenantChangeNotification.getChange()) && !tenantChangeNotification.isTenantEnabled())) {
                closeDeviceConnectionsOnDeviceOrTenantChange(device -> {
                    return device.getTenantId().equals(tenantChangeNotification.getTenantId());
                }, LifecycleChange.DELETE.equals(tenantChangeNotification.getChange()) ? "tenant deleted" : "tenant disabled");
            }
        });
    }

    private void closeDeviceConnectionsOnDeviceOrTenantChange(Predicate<Device> predicate, String str) {
        ((List) this.connectedAuthenticatedDeviceEndpoints.stream().filter(mqttDeviceEndpoint -> {
            return predicate.test(mqttDeviceEndpoint.getAuthenticatedDevice());
        }).collect(Collectors.toList())).forEach(mqttDeviceEndpoint2 -> {
            Futures.onCurrentContextCompletionHandler(asyncResult -> {
                mqttDeviceEndpoint2.close(str, false);
            }).handle((Object) null);
        });
    }

    private ConnectionLimitManager createConnectionLimitManager() {
        return new DefaultConnectionLimitManager(MemoryBasedConnectionLimitStrategy.forParams(((MqttProtocolAdapterProperties) getConfig()).isSubstrateVm() ? 35000000L : 100000000L, 20000L, ((MqttProtocolAdapterProperties) getConfig()).getGcHeapPercentage()), () -> {
            return Integer.valueOf(this.metrics.getNumberOfConnections());
        }, (ProtocolAdapterProperties) getConfig());
    }

    protected final void doStop(Promise<Void> promise) {
        if (!this.stopResultPromiseRef.compareAndSet(null, promise)) {
            this.stopResultPromiseRef.get().future().onComplete(promise);
            this.log.trace("stop already called");
            return;
        }
        Promise promise2 = Promise.promise();
        if (this.server != null) {
            this.log.info("closing secure MQTT server ...");
            this.server.close(promise2);
        } else {
            promise2.complete();
        }
        Promise promise3 = Promise.promise();
        if (this.insecureServer != null) {
            this.log.info("closing insecure MQTT server ...");
            this.insecureServer.close(promise3);
        } else {
            promise3.complete();
        }
        CompositeFuture.all(promise2.future(), promise3.future()).map(compositeFuture -> {
            return (Void) null;
        }).onComplete(asyncResult -> {
            this.log.info("MQTT server(s) closed");
        }).onComplete(promise);
    }

    final void handleEndpointConnection(MqttEndpoint mqttEndpoint) {
        this.log.debug("connection request from client [client-id: {}]", mqttEndpoint.clientIdentifier());
        String str = (String) Optional.ofNullable(mqttEndpoint.sslSession()).map((v0) -> {
            return v0.getCipherSuite();
        }).orElse(null);
        Span start = this.tracer.buildSpan("CONNECT").ignoreActiveSpan().withTag(Tags.SPAN_KIND.getKey(), "server").withTag(Tags.COMPONENT.getKey(), getTypeName()).withTag(TracingHelper.TAG_CLIENT_ID.getKey(), mqttEndpoint.clientIdentifier()).start();
        if (!mqttEndpoint.isCleanSession()) {
            start.log("ignoring client's intent to resume existing session");
        }
        if (mqttEndpoint.will() != null) {
            start.log("ignoring client's last will");
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        mqttEndpoint.closeHandler(r8 -> {
            this.log.debug("client closed connection before CONNACK got sent to client [client-id: {}]", mqttEndpoint.clientIdentifier());
            TracingHelper.logError(start, "client closed connection");
            atomicBoolean.set(true);
        });
        handleConnectionRequest(mqttEndpoint, atomicBoolean, start).compose(deviceUser -> {
            return handleConnectionRequestResult(mqttEndpoint, deviceUser, atomicBoolean, start);
        }).onSuccess(deviceUser2 -> {
            mqttEndpoint.accept(false);
            start.log("connection accepted");
            this.metrics.reportConnectionAttempt(MetricsTags.ConnectionAttemptOutcome.SUCCEEDED, (String) Optional.ofNullable(deviceUser2).map((v0) -> {
                return v0.getTenantId();
            }).orElse(null), str);
        }).onFailure(th -> {
            if (!atomicBoolean.get()) {
                this.log.debug("rejecting connection request from client [clientId: {}], cause:", mqttEndpoint.clientIdentifier(), th);
                rejectConnectionRequest(mqttEndpoint, getConnectReturnCode(th), start);
                TracingHelper.logError(start, th);
            }
            this.metrics.reportConnectionAttempt(AbstractProtocolAdapterBase.getOutcome(th), th instanceof ServiceInvocationException ? ((ServiceInvocationException) th).getTenant() : null, str);
        }).onComplete(asyncResult -> {
            start.finish();
        });
    }

    private Future<DeviceUser> handleConnectionRequest(MqttEndpoint mqttEndpoint, AtomicBoolean atomicBoolean, Span span) {
        if (getConnectionLimitManager() == null || !getConnectionLimitManager().isLimitExceeded()) {
            return ((MqttProtocolAdapterProperties) getConfig()).isAuthenticationRequired() ? handleEndpointConnectionWithAuthentication(mqttEndpoint, atomicBoolean, span) : handleEndpointConnectionWithoutAuthentication(mqttEndpoint);
        }
        span.log("adapter's connection limit exceeded");
        return Future.failedFuture(new AdapterConnectionsExceededException((String) null, "adapter's connection limit is exceeded", (Throwable) null));
    }

    private Future<DeviceUser> handleConnectionRequestResult(MqttEndpoint mqttEndpoint, DeviceUser deviceUser, AtomicBoolean atomicBoolean, Span span) {
        TracingHelper.TAG_AUTHENTICATED.set(span, Boolean.valueOf(deviceUser != null));
        if (deviceUser != null) {
            TracingHelper.setDeviceTags(span, deviceUser.getTenantId(), deviceUser.getDeviceId());
        }
        Promise promise = Promise.promise();
        if (atomicBoolean.get()) {
            this.log.debug("abort handling connection request, connection already closed [clientId: {}]", mqttEndpoint.clientIdentifier());
            span.log("abort connection request processing, connection already closed");
            promise.fail("connection already closed");
        } else {
            sendConnectedEvent(mqttEndpoint.clientIdentifier(), deviceUser, span.context()).map(deviceUser).recover(th -> {
                this.log.warn("failed to send connection event for client [clientId: {}]", mqttEndpoint.clientIdentifier(), th);
                return Future.failedFuture(new ServerErrorException(503, "failed to send connection event", th));
            }).onComplete(promise);
        }
        return promise.future();
    }

    private void rejectConnectionRequest(MqttEndpoint mqttEndpoint, MqttConnectReturnCode mqttConnectReturnCode, Span span) {
        try {
            mqttEndpoint.reject(mqttConnectReturnCode);
            span.log("connection request rejected");
        } catch (IllegalStateException e) {
            if ("MQTT endpoint is closed".equals(e.getMessage())) {
                this.log.debug("skipped rejecting connection request, connection already closed [clientId: {}]", mqttEndpoint.clientIdentifier());
                span.log("skipped rejecting connection request, connection already closed");
            } else {
                this.log.debug("could not reject connection request from client [clientId: {}]: {}", mqttEndpoint.clientIdentifier(), e.toString());
                TracingHelper.logError(span, "could not reject connection request from client", e);
            }
        }
    }

    private Future<DeviceUser> handleEndpointConnectionWithoutAuthentication(MqttEndpoint mqttEndpoint) {
        registerEndpointHandlers(mqttEndpoint, null, OptionalInt.empty());
        this.metrics.incrementUnauthenticatedConnections();
        this.log.debug("unauthenticated device [clientId: {}] connected", mqttEndpoint.clientIdentifier());
        return Future.succeededFuture((Object) null);
    }

    private Future<DeviceUser> handleEndpointConnectionWithAuthentication(MqttEndpoint mqttEndpoint, AtomicBoolean atomicBoolean, Span span) {
        MqttConnectContext fromConnectPacket = MqttConnectContext.fromConnectPacket(mqttEndpoint, span);
        Future authenticateDevice = this.authHandler.authenticateDevice(fromConnectPacket);
        return authenticateDevice.compose(deviceUser -> {
            return CompositeFuture.all(getTenantConfiguration(deviceUser.getTenantId(), span.context()).compose(tenantObject -> {
                return CompositeFuture.all(isAdapterEnabled(tenantObject), checkConnectionLimit(tenantObject, span.context()));
            }), checkDeviceRegistration(deviceUser, span.context())).map(deviceUser);
        }).compose(deviceUser2 -> {
            if (atomicBoolean.get()) {
                return Future.failedFuture("connection already closed");
            }
            registerEndpointHandlers(mqttEndpoint, deviceUser2, fromConnectPacket.getTraceSamplingPriority());
            this.metrics.incrementConnections(deviceUser2.getTenantId());
            return Future.succeededFuture(deviceUser2);
        }).recover(th -> {
            if (authenticateDevice.failed()) {
                this.log.debug("device authentication or early stage checks failed", th);
            } else {
                this.log.debug("cannot establish connection with device [tenant-id: {}, device-id: {}]", new Object[]{((DeviceUser) authenticateDevice.result()).getTenantId(), ((DeviceUser) authenticateDevice.result()).getDeviceId(), th});
            }
            return Future.failedFuture(th);
        });
    }

    public final Future<Void> uploadMessage(MqttContext mqttContext) {
        Objects.requireNonNull(mqttContext);
        verifyTenantAndDeviceContextIsSet(mqttContext);
        switch (AnonymousClass1.$SwitchMap$org$eclipse$hono$service$metric$MetricsTags$EndpointType[mqttContext.endpoint().ordinal()]) {
            case 1:
                return uploadTelemetryMessage(mqttContext);
            case 2:
                return uploadEventMessage(mqttContext);
            case 3:
                return uploadCommandResponseMessage(mqttContext);
            default:
                return Future.failedFuture(new ClientErrorException(400, "unsupported endpoint"));
        }
    }

    private void verifyTenantAndDeviceContextIsSet(MqttContext mqttContext) {
        if (mqttContext.tenant() == null || mqttContext.deviceId() == null) {
            throw new IllegalArgumentException("context does not contain tenant/device information");
        }
    }

    public final Future<Void> uploadTelemetryMessage(MqttContext mqttContext) {
        Objects.requireNonNull(mqttContext);
        if (mqttContext.endpoint() != MetricsTags.EndpointType.TELEMETRY) {
            throw new IllegalArgumentException("context does not contain telemetry message but " + mqttContext.endpoint().getCanonicalName());
        }
        verifyTenantAndDeviceContextIsSet(mqttContext);
        Buffer payload = mqttContext.payload();
        MetricsTags.QoS from = MetricsTags.QoS.from(mqttContext.qosLevel().value());
        Future tenantConfiguration = getTenantConfiguration(mqttContext.tenant(), mqttContext.getTracingContext());
        return tenantConfiguration.compose(tenantObject -> {
            return uploadMessage(mqttContext, tenantObject, mqttContext.deviceId(), payload, mqttContext.endpoint());
        }).compose(r14 -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, from, payload.length(), mqttContext.getTimer());
            return Future.succeededFuture();
        }).recover(th -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), from, payload.length(), mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    public final Future<Void> uploadEventMessage(MqttContext mqttContext) {
        Objects.requireNonNull(mqttContext);
        if (mqttContext.endpoint() != MetricsTags.EndpointType.EVENT) {
            throw new IllegalArgumentException("context does not contain event but " + mqttContext.endpoint().getCanonicalName());
        }
        verifyTenantAndDeviceContextIsSet(mqttContext);
        Buffer payload = mqttContext.payload();
        MetricsTags.QoS from = MetricsTags.QoS.from(mqttContext.qosLevel().value());
        Future tenantConfiguration = getTenantConfiguration(mqttContext.tenant(), mqttContext.getTracingContext());
        return tenantConfiguration.compose(tenantObject -> {
            return uploadMessage(mqttContext, tenantObject, mqttContext.deviceId(), payload, mqttContext.endpoint());
        }).compose(r14 -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, from, payload.length(), mqttContext.getTimer());
            return Future.succeededFuture();
        }).recover(th -> {
            this.metrics.reportTelemetry(mqttContext.endpoint(), mqttContext.tenant(), (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), from, payload.length(), mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    public final Future<Void> uploadCommandResponseMessage(MqttContext mqttContext) {
        Future failedFuture;
        Objects.requireNonNull(mqttContext);
        if (mqttContext.endpoint() != MetricsTags.EndpointType.COMMAND) {
            throw new IllegalArgumentException("context does not contain command response message but " + mqttContext.endpoint().getCanonicalName());
        }
        if (mqttContext.topic() == null) {
            throw new IllegalArgumentException("context does not contain topic");
        }
        verifyTenantAndDeviceContextIsSet(mqttContext);
        String[] resourcePath = mqttContext.topic().getResourcePath();
        String tenant = mqttContext.tenant();
        String deviceId = mqttContext.deviceId();
        Integer num = null;
        String str = null;
        if (resourcePath.length <= 5) {
            failedFuture = Future.failedFuture(new ClientErrorException(400, "command response topic has too few segments"));
        } else {
            try {
                num = Integer.valueOf(Integer.parseInt(resourcePath[5]));
            } catch (NumberFormatException e) {
                this.log.trace("got invalid status code [{}] [tenant-id: {}, device-id: {}]", new Object[]{resourcePath[5], tenant, deviceId});
            }
            if (num != null) {
                str = resourcePath[4];
                CommandResponse fromRequestId = CommandResponse.fromRequestId(str, tenant, deviceId, mqttContext.payload(), mqttContext.contentType(), num);
                failedFuture = fromRequestId != null ? Future.succeededFuture(fromRequestId) : Future.failedFuture(new ClientErrorException(400, "command response topic contains invalid data"));
            } else {
                failedFuture = Future.failedFuture(new ClientErrorException(400, "invalid status code"));
            }
        }
        Span start = TracingHelper.buildChildSpan(this.tracer, mqttContext.getTracingContext(), "upload Command response", getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenant).withTag(TracingHelper.TAG_DEVICE_ID, deviceId).withTag("hono-cmd-status", num).withTag("hono-cmd-req-id", str).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
        int length = mqttContext.payload().length();
        Future tenantConfiguration = getTenantConfiguration(tenant, mqttContext.getTracingContext());
        Future future = failedFuture;
        return CompositeFuture.all(tenantConfiguration, failedFuture).compose(compositeFuture -> {
            Future registrationAssertion = getRegistrationAssertion(tenant, deviceId, mqttContext.authenticatedDevice(), start.context());
            return CompositeFuture.all(registrationAssertion, CompositeFuture.all(isAdapterEnabled((TenantObject) tenantConfiguration.result()), checkMessageLimit((TenantObject) tenantConfiguration.result(), length, start.context())).mapEmpty()).compose(compositeFuture -> {
                return sendCommandResponse((TenantObject) tenantConfiguration.result(), (RegistrationAssertion) registrationAssertion.result(), (CommandResponse) future.result(), start.context());
            });
        }).compose(r15 -> {
            this.log.trace("successfully forwarded command response from device [tenant-id: {}, device-id: {}]", tenant, deviceId);
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenant, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.FORWARDED, length, mqttContext.getTimer());
            if (mqttContext.isAtLeastOnce() && mqttContext.deviceEndpoint().isConnected()) {
                start.log(EVENT_SENDING_PUBACK);
                mqttContext.acknowledge();
            }
            start.finish();
            return Future.succeededFuture();
        }).recover(th -> {
            TracingHelper.logError(start, th);
            start.finish();
            this.metrics.reportCommand(MetricsTags.Direction.RESPONSE, tenant, (TenantObject) tenantConfiguration.result(), MetricsTags.ProcessingOutcome.from(th), length, mqttContext.getTimer());
            return Future.failedFuture(th);
        });
    }

    private Future<Void> uploadMessage(MqttContext mqttContext, TenantObject tenantObject, String str, Buffer buffer, MetricsTags.EndpointType endpointType) {
        if (!isPayloadOfIndicatedType(buffer, mqttContext.contentType())) {
            return Future.failedFuture(new ClientErrorException(400, String.format("Content-Type %s does not match payload", mqttContext.contentType())));
        }
        Span start = TracingHelper.buildChildSpan(this.tracer, mqttContext.getTracingContext(), "upload " + endpointType, getTypeName()).withTag(Tags.SPAN_KIND.getKey(), "client").withTag(TracingHelper.TAG_TENANT_ID, tenantObject.getTenantId()).withTag(TracingHelper.TAG_DEVICE_ID, str).withTag(TracingHelper.TAG_AUTHENTICATED.getKey(), mqttContext.authenticatedDevice() != null).start();
        Future registrationAssertion = getRegistrationAssertion(tenantObject.getTenantId(), str, mqttContext.authenticatedDevice(), start.context());
        return CompositeFuture.all(registrationAssertion, CompositeFuture.all(isAdapterEnabled(tenantObject), checkMessageLimit(tenantObject, buffer.length(), start.context())).map(tenantObject)).compose(compositeFuture -> {
            Map<String, Object> downstreamMessageProperties = getDownstreamMessageProperties(mqttContext);
            Optional.ofNullable(mqttContext.getRequestedQos()).map((v0) -> {
                return v0.ordinal();
            }).ifPresent(num -> {
                downstreamMessageProperties.put("qos", num);
            });
            addRetainAnnotation(mqttContext, downstreamMessageProperties, start);
            customizeDownstreamMessageProperties(downstreamMessageProperties, mqttContext);
            return endpointType == MetricsTags.EndpointType.EVENT ? getEventSender(tenantObject).sendEvent(tenantObject, (RegistrationAssertion) registrationAssertion.result(), mqttContext.contentType(), buffer, downstreamMessageProperties, start.context()) : getTelemetrySender(tenantObject).sendTelemetry(tenantObject, (RegistrationAssertion) registrationAssertion.result(), mqttContext.getRequestedQos(), mqttContext.contentType(), buffer, downstreamMessageProperties, start.context());
        }).map(r12 -> {
            this.log.trace("successfully processed message [topic: {}, QoS: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{mqttContext.getOrigAddress(), mqttContext.qosLevel(), tenantObject.getTenantId(), str});
            if (mqttContext.isAtLeastOnce() && mqttContext.deviceEndpoint().isConnected()) {
                start.log(EVENT_SENDING_PUBACK);
                mqttContext.acknowledge();
            }
            start.finish();
            return r12;
        }).recover(th -> {
            if (th instanceof ClientErrorException) {
                ClientErrorException clientErrorException = (ClientErrorException) th;
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]: {} - {}", new Object[]{endpointType, tenantObject.getTenantId(), str, Integer.valueOf(clientErrorException.getErrorCode()), clientErrorException.getMessage()});
            } else {
                this.log.debug("cannot process message [endpoint: {}] from device [tenantId: {}, deviceId: {}]", new Object[]{endpointType, tenantObject.getTenantId(), str, th});
            }
            TracingHelper.logError(start, th);
            start.finish();
            return Future.failedFuture(th);
        });
    }

    private void onBeforeEndpointClose(AbstractVertxBasedMqttProtocolAdapter<T>.MqttDeviceEndpoint mqttDeviceEndpoint) {
        if (mqttDeviceEndpoint.getAuthenticatedDevice() != null) {
            this.connectedAuthenticatedDeviceEndpoints.remove(mqttDeviceEndpoint);
        }
    }

    protected void onClose(MqttEndpoint mqttEndpoint) {
    }

    protected abstract Future<Void> onPublishedMessage(MqttContext mqttContext);

    protected void customizeDownstreamMessageProperties(Map<String, Object> map, MqttContext mqttContext) {
    }

    protected void onMessageSent(MqttContext mqttContext) {
    }

    protected void onMessageUndeliverable(MqttContext mqttContext) {
    }

    private static void addRetainAnnotation(MqttContext mqttContext, Map<String, Object> map, Span span) {
        if (mqttContext.isRetain()) {
            span.log("device wants to retain message");
            map.put("x-opt-retain", Boolean.TRUE);
        }
    }

    private void registerEndpointHandlers(MqttEndpoint mqttEndpoint, DeviceUser deviceUser, OptionalInt optionalInt) {
        createMqttDeviceEndpoint(mqttEndpoint, deviceUser, optionalInt).registerHandlers();
    }

    final AbstractVertxBasedMqttProtocolAdapter<T>.MqttDeviceEndpoint createMqttDeviceEndpoint(MqttEndpoint mqttEndpoint, DeviceUser deviceUser, OptionalInt optionalInt) {
        AbstractVertxBasedMqttProtocolAdapter<T>.MqttDeviceEndpoint mqttDeviceEndpoint = new MqttDeviceEndpoint(mqttEndpoint, deviceUser, optionalInt);
        if (deviceUser != null) {
            this.connectedAuthenticatedDeviceEndpoints.add(mqttDeviceEndpoint);
        }
        return mqttDeviceEndpoint;
    }

    private static MqttConnectReturnCode getConnectReturnCode(Throwable th) {
        if (th instanceof MqttConnectionException) {
            return ((MqttConnectionException) th).code();
        }
        if (th instanceof AuthorizationException) {
            return th instanceof AdapterConnectionsExceededException ? MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE : MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        if (!(th instanceof ServiceInvocationException)) {
            return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
        switch (((ServiceInvocationException) th).getErrorCode()) {
            case 401:
            case 404:
                return MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
            case 503:
                return MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
            default:
                return MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED;
        }
    }

    protected Future<Buffer> getCommandPayload(CommandContext commandContext) {
        return Future.succeededFuture(commandContext.getCommand().getPayload());
    }
}
