package io.vertx.mqtt.impl;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.NetSocketInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;

/* loaded from: input_file:io/vertx/mqtt/impl/MqttServerImpl.class */
public class MqttServerImpl implements MqttServer {
    private static final Logger log = LoggerFactory.getLogger(MqttServerImpl.class);
    private final NetServer server;
    private Handler<MqttEndpoint> endpointHandler;
    private Handler<Throwable> exceptionHandler;
    private MqttServerOptions options;

    public MqttServerImpl(Vertx vertx, MqttServerOptions mqttServerOptions) {
        this.server = vertx.createNetServer(mqttServerOptions);
        this.options = mqttServerOptions;
    }

    @Override // io.vertx.mqtt.MqttServer
    public Future<MqttServer> listen() {
        Promise promise = Promise.promise();
        listen((Handler<AsyncResult<MqttServer>>) promise);
        return promise.future();
    }

    @Override // io.vertx.mqtt.MqttServer
    public Future<MqttServer> listen(int i, String str) {
        Promise promise = Promise.promise();
        listen(i, str, promise);
        return promise.future();
    }

    @Override // io.vertx.mqtt.MqttServer
    public Future<MqttServer> listen(int i) {
        Promise promise = Promise.promise();
        listen(i, (Handler<AsyncResult<MqttServer>>) promise);
        return promise.future();
    }

    @Override // io.vertx.mqtt.MqttServer
    public MqttServer listen(int i, Handler<AsyncResult<MqttServer>> handler) {
        return listen(i, this.options.getHost(), handler);
    }

    @Override // io.vertx.mqtt.MqttServer
    public MqttServer listen(Handler<AsyncResult<MqttServer>> handler) {
        return listen(this.options.getPort(), handler);
    }

    @Override // io.vertx.mqtt.MqttServer
    public MqttServer listen(int i, String str, Handler<AsyncResult<MqttServer>> handler) {
        Handler<MqttEndpoint> handler2 = this.endpointHandler;
        Handler<Throwable> handler3 = this.exceptionHandler;
        this.server.connectHandler(netSocket -> {
            NetSocketInternal netSocketInternal = (NetSocketInternal) netSocket;
            initChannel(netSocketInternal.channelHandlerContext().pipeline());
            MqttServerConnection mqttServerConnection = new MqttServerConnection(netSocketInternal, this.options);
            netSocketInternal.messageHandler(obj -> {
                synchronized (mqttServerConnection) {
                    mqttServerConnection.handleMessage(obj);
                }
            });
            mqttServerConnection.init(handler2, handler3);
        });
        this.server.listen(i, str, asyncResult -> {
            handler.handle(asyncResult.map(this));
        });
        return this;
    }

    @Override // io.vertx.mqtt.MqttServer
    public synchronized MqttServer endpointHandler(Handler<MqttEndpoint> handler) {
        this.endpointHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttServer
    public synchronized MqttServer exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttServer
    public int actualPort() {
        return this.server.actualPort();
    }

    @Override // io.vertx.mqtt.MqttServer
    public Future<Void> close() {
        return this.server.close();
    }

    @Override // io.vertx.mqtt.MqttServer
    public void close(Handler<AsyncResult<Void>> handler) {
        this.server.close(handler);
    }

    private void initChannel(ChannelPipeline channelPipeline) {
        channelPipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
        if (this.options.getMaxMessageSize() > 0) {
            channelPipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
        } else {
            channelPipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
        }
        channelPipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
        channelPipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler() { // from class: io.vertx.mqtt.impl.MqttServerImpl.1
            public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.READER_IDLE) {
                    channelHandlerContext.channel().close();
                }
            }
        });
    }
}
