package org.kinotic.continuum.gateway.internal.endpoints.mqtt;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.mqtt.MqttServer;
import java.util.Objects;
import org.kinotic.continuum.gateway.api.config.ContinuumGatewayProperties;
import org.kinotic.continuum.gateway.internal.endpoints.EndpointConnectionHandler;
import org.kinotic.continuum.gateway.internal.endpoints.Services;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/kinotic/continuum/gateway/internal/endpoints/mqtt/MqttServerVerticle.class */
public class MqttServerVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(MqttServerVerticle.class);

    @Autowired
    private ContinuumGatewayProperties gatewayProperties;

    @Autowired
    private Services services;
    private MqttServer mqttServer;

    public void start(Future<Void> future) {
        this.mqttServer = MqttServer.create(this.vertx, this.gatewayProperties.getMqtt());
        this.mqttServer.endpointHandler(mqttEndpoint -> {
            MqttHandler mqttHandler = new MqttHandler(new EndpointConnectionHandler(this.services), new MqttClientConnection(mqttEndpoint));
            mqttEndpoint.subscriptionAutoAck(false);
            mqttEndpoint.publishAutoAck(false);
            mqttEndpoint.autoKeepAlive(true);
            mqttHandler.authenticate(mqttEndpoint.auth().getUsername(), mqttEndpoint.auth().getPassword()).future().setHandler(asyncResult -> {
                if (!asyncResult.succeeded()) {
                    mqttEndpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
                    return;
                }
                mqttEndpoint.accept();
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.publishHandler(mqttHandler::publish);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.publishAcknowledgeHandler(mqttHandler::publishAcknowledge);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.publishReceivedHandler(mqttHandler::publishReceived);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.publishCompletionHandler(mqttHandler::publishCompletion);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.subscribeHandler(mqttHandler::subscribe);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.unsubscribeHandler(mqttHandler::unsubscribe);
                Objects.requireNonNull(mqttHandler);
                mqttEndpoint.exceptionHandler(mqttHandler::exception);
                mqttEndpoint.closeHandler(r3 -> {
                    mqttHandler.close();
                });
            });
        }).exceptionHandler(th -> {
            log.error("MQTT server Exception before completing Client Connection", th);
        }).listen(asyncResult -> {
            if (asyncResult.succeeded()) {
                log.info("MQTT Server Listening on port " + ((MqttServer) asyncResult.result()).actualPort());
            } else {
                log.error("Error starting MQTT Server", asyncResult.cause());
                System.out.println("Error on starting the server");
            }
        });
    }

    public void stop(Future<Void> future) {
        if (this.mqttServer != null) {
            this.mqttServer.close();
        }
    }
}
