package io.vertx.mqtt.impl;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubscribePayload;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.handler.codec.mqtt.MqttUnsubscribePayload;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.NetSocketInternal;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.MqttException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubAckMessage;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/vertx/mqtt/impl/MqttClientImpl.class */
public class MqttClientImpl implements MqttClient {
    private static final Logger log = LoggerFactory.getLogger(MqttClientImpl.class);
    private static final int MAX_MESSAGE_ID = 65535;
    private static final int MAX_TOPIC_LEN = 65535;
    private static final int MIN_TOPIC_LEN = 1;
    private static final String PROTOCOL_NAME = "MQTT";
    private static final int PROTOCOL_VERSION = 4;
    private static final int DEFAULT_IDLE_TIMEOUT = 0;
    private MqttClientOptions options;
    private MqttClientConnection connection;
    private final NetClient client;
    Handler<Integer> publishCompletionHandler;
    Handler<Integer> unsubscribeCompletionHandler;
    Handler<MqttPublishMessage> publishHandler;
    Handler<MqttSubAckMessage> subscribeCompletionHandler;
    Handler<AsyncResult<MqttConnAckMessage>> connectHandler;
    Handler<Void> pingrespHandler;
    Handler<Throwable> exceptionHandler;
    Handler<Void> closeHandler;
    private int messageIdCounter;
    private int countInflightQueue;
    private boolean isConnected;
    LinkedHashMap<Integer, MqttMessage> qos1outbound = new LinkedHashMap<>();
    LinkedHashMap<Integer, MqttMessage> qos2outbound = new LinkedHashMap<>();
    LinkedHashMap<Integer, io.vertx.mqtt.messages.MqttMessage> qos2inbound = new LinkedHashMap<>();
    private Pattern validTopicNamePattern = Pattern.compile("^[^#+\\u0000]+$");
    private Pattern validTopicFilterPattern = Pattern.compile("^(#|((\\+(?![^/]))?([^#+]*(/\\+(?![^/]))?)*(/#)?))$");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.vertx.mqtt.impl.MqttClientImpl$2, reason: invalid class name */
    /* loaded from: input_file:io/vertx/mqtt/impl/MqttClientImpl$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS = new int[MqttQoS.values().length];

        static {
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_LEAST_ONCE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.EXACTLY_ONCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[MqttQoS.AT_MOST_ONCE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public MqttClientImpl(Vertx vertx, MqttClientOptions mqttClientOptions) {
        NetClientOptions netClientOptions = new NetClientOptions(mqttClientOptions);
        netClientOptions.setIdleTimeout(0);
        this.client = vertx.createNetClient(netClientOptions);
        this.options = mqttClientOptions;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient connect(int i, String str, Handler<AsyncResult<MqttConnAckMessage>> handler) {
        doConnect(i, str, null, handler);
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient connect(int i, String str, String str2, Handler<AsyncResult<MqttConnAckMessage>> handler) {
        doConnect(i, str, str2, handler);
        return this;
    }

    private void doConnect(int i, String str, String str2, Handler<AsyncResult<MqttConnAckMessage>> handler) {
        log.debug(String.format("Trying to connect with %s:%d", str, Integer.valueOf(i)));
        this.client.connect(i, str, str2, asyncResult -> {
            if (asyncResult.failed()) {
                log.error(String.format("Can't connect to %s:%d", str, Integer.valueOf(i)), asyncResult.cause());
                if (handler != null) {
                    handler.handle(Future.failedFuture(asyncResult.cause()));
                    return;
                }
                return;
            }
            log.info(String.format("Connection with %s:%d established successfully", str, Integer.valueOf(i)));
            NetSocketInternal netSocketInternal = (NetSocketInternal) asyncResult.result();
            ChannelPipeline pipeline = netSocketInternal.channelHandlerContext().pipeline();
            this.connectHandler = handler;
            if (this.options.isAutoGeneratedClientId() && (this.options.getClientId() == null || this.options.getClientId().isEmpty())) {
                this.options.setClientId(generateRandomClientId());
            }
            initChannel(pipeline);
            this.connection = new MqttClientConnection(this, netSocketInternal, this.options);
            netSocketInternal.messageHandler(obj -> {
                this.connection.handleMessage(obj);
            });
            netSocketInternal.closeHandler(r3 -> {
                handleClosed();
            });
            netSocketInternal.exceptionHandler(this::handleException);
            write(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.CONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0), new MqttConnectVariableHeader(PROTOCOL_NAME, PROTOCOL_VERSION, this.options.hasUsername(), this.options.hasPassword(), this.options.isWillRetain(), this.options.getWillQoS(), this.options.isWillFlag(), this.options.isCleanSession(), this.options.getKeepAliveTimeSeconds()), new MqttConnectPayload(this.options.getClientId() == null ? "" : this.options.getClientId(), this.options.getWillTopic(), this.options.getWillMessage() != null ? this.options.getWillMessage().getBytes(StandardCharsets.UTF_8) : null, this.options.hasUsername() ? this.options.getUsername() : null, this.options.hasPassword() ? this.options.getPassword().getBytes() : null)));
        });
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient disconnect() {
        return disconnect(null);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient disconnect(Handler<AsyncResult<Void>> handler) {
        write(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object) null, (Object) null));
        if (handler != null) {
            handler.handle(Future.succeededFuture());
        }
        this.connection.close();
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient publish(String str, Buffer buffer, MqttQoS mqttQoS, boolean z, boolean z2) {
        return publish(str, buffer, mqttQoS, z, z2, null);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient publish(String str, Buffer buffer, MqttQoS mqttQoS, boolean z, boolean z2, Handler<AsyncResult<Integer>> handler) {
        if (this.countInflightQueue >= this.options.getMaxInflightQueue()) {
            String format = String.format("Attempt to exceed the limit of %d inflight messages", Integer.valueOf(this.options.getMaxInflightQueue()));
            log.error(format);
            MqttException mqttException = new MqttException(2, format);
            if (handler != null) {
                handler.handle(Future.failedFuture(mqttException));
            }
            return this;
        }
        if (!isValidTopicName(str)) {
            String format2 = String.format("Invalid Topic Name - %s. It mustn't contains wildcards: # and +. Also it can't contains U+0000(NULL) chars", str);
            log.error(format2);
            MqttException mqttException2 = new MqttException(0, format2);
            if (handler != null) {
                handler.handle(Future.failedFuture(mqttException2));
            }
            return this;
        }
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, z, mqttQoS, z2, 0);
        MqttPublishVariableHeader mqttPublishVariableHeader = new MqttPublishVariableHeader(str, nextMessageId());
        MqttMessage newMessage = MqttMessageFactory.newMessage(mqttFixedHeader, mqttPublishVariableHeader, Unpooled.copiedBuffer(buffer.getBytes()));
        switch (AnonymousClass2.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[mqttQoS.ordinal()]) {
            case 1:
                this.qos1outbound.put(Integer.valueOf(mqttPublishVariableHeader.messageId()), newMessage);
                this.countInflightQueue++;
                break;
            case MqttException.MQTT_INFLIGHT_QUEUE_FULL /* 2 */:
                this.qos2outbound.put(Integer.valueOf(mqttPublishVariableHeader.messageId()), newMessage);
                this.countInflightQueue++;
                break;
        }
        write(newMessage);
        if (handler != null) {
            handler.handle(Future.succeededFuture(Integer.valueOf(mqttPublishVariableHeader.messageId())));
        }
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient publishCompletionHandler(Handler<Integer> handler) {
        this.publishCompletionHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient publishHandler(Handler<MqttPublishMessage> handler) {
        this.publishHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient subscribeCompletionHandler(Handler<MqttSubAckMessage> handler) {
        this.subscribeCompletionHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient subscribe(String str, int i) {
        return subscribe(str, i, null);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient subscribe(String str, int i, Handler<AsyncResult<Integer>> handler) {
        return subscribe(Collections.singletonMap(str, Integer.valueOf(i)), handler);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient subscribe(Map<String, Integer> map) {
        return subscribe(map, (Handler<AsyncResult<Integer>>) null);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient subscribe(Map<String, Integer> map, Handler<AsyncResult<Integer>> handler) {
        Map map2 = (Map) map.entrySet().stream().filter(entry -> {
            return !isValidTopicFilter((String) entry.getKey());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        if (map2.size() > 0) {
            String format = String.format("Invalid Topic Filters: %s", map2);
            log.error(format);
            MqttException mqttException = new MqttException(1, format);
            if (handler != null) {
                handler.handle(Future.failedFuture(mqttException));
            }
            return this;
        }
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.SUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(nextMessageId());
        write(MqttMessageFactory.newMessage(mqttFixedHeader, from, new MqttSubscribePayload((List) map.entrySet().stream().map(entry2 -> {
            return new MqttTopicSubscription((String) entry2.getKey(), MqttQoS.valueOf(((Integer) entry2.getValue()).intValue()));
        }).collect(Collectors.toList()))));
        if (handler != null) {
            handler.handle(Future.succeededFuture(Integer.valueOf(from.messageId())));
        }
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient unsubscribeCompletionHandler(Handler<Integer> handler) {
        this.unsubscribeCompletionHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient unsubscribe(String str, Handler<AsyncResult<Integer>> handler) {
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBSCRIBE, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdVariableHeader from = MqttMessageIdVariableHeader.from(nextMessageId());
        write(MqttMessageFactory.newMessage(mqttFixedHeader, from, new MqttUnsubscribePayload((List) Stream.of(str).collect(Collectors.toList()))));
        if (handler != null) {
            handler.handle(Future.succeededFuture(Integer.valueOf(from.messageId())));
        }
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient unsubscribe(String str) {
        return unsubscribe(str, null);
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient pingResponseHandler(Handler<Void> handler) {
        this.pingrespHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient closeHandler(Handler<Void> handler) {
        this.closeHandler = handler;
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public MqttClient ping() {
        write(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0), (Object) null, (Object) null));
        return this;
    }

    @Override // io.vertx.mqtt.MqttClient
    public String clientId() {
        return this.options.getClientId();
    }

    @Override // io.vertx.mqtt.MqttClient
    public boolean isConnected() {
        return this.isConnected;
    }

    void publishAcknowledge(int i) {
        write(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null));
    }

    void publishReceived(MqttPublishMessage mqttPublishMessage) {
        MqttMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(mqttPublishMessage.messageId()), (Object) null);
        this.qos2inbound.put(Integer.valueOf(mqttPublishMessage.messageId()), mqttPublishMessage);
        write(newMessage);
    }

    void publishComplete(int i) {
        write(MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null));
    }

    void publishRelease(int i) {
        MqttMessage newMessage = MqttMessageFactory.newMessage(new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0), MqttMessageIdVariableHeader.from(i), (Object) null);
        this.qos2outbound.put(Integer.valueOf(i), newMessage);
        write(newMessage);
    }

    private void initChannel(ChannelPipeline channelPipeline) {
        channelPipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
        if (this.options.getMaxMessageSize() > 0) {
            channelPipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
        } else {
            channelPipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
        }
        if (!this.options.isAutoKeepAlive() || this.options.getKeepAliveTimeSeconds() == 0) {
            return;
        }
        channelPipeline.addBefore("handler", "idle", new IdleStateHandler(0, this.options.getKeepAliveTimeSeconds(), 0));
        channelPipeline.addBefore("handler", "keepAliveHandler", new ChannelDuplexHandler() { // from class: io.vertx.mqtt.impl.MqttClientImpl.1
            public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                if ((obj instanceof IdleStateEvent) && ((IdleStateEvent) obj).state() == IdleState.WRITER_IDLE) {
                    MqttClientImpl.this.ping();
                }
            }
        });
    }

    private int nextMessageId() {
        this.messageIdCounter = this.messageIdCounter % 65535 != 0 ? this.messageIdCounter + 1 : 1;
        return this.messageIdCounter;
    }

    public MqttClientImpl write(MqttMessage mqttMessage) {
        synchronized (this.connection) {
            log.debug(String.format("Sending packet %s", mqttMessage));
            this.connection.writeMessage(mqttMessage);
        }
        return this;
    }

    void handleClosed() {
        synchronized (this.connection) {
            boolean z = this.isConnected;
            cleanup();
            if (this.closeHandler != null && z) {
                this.closeHandler.handle((Object) null);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePingresp() {
        synchronized (this.connection) {
            if (this.pingrespHandler != null) {
                this.pingrespHandler.handle((Object) null);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleUnsuback(int i) {
        synchronized (this.connection) {
            if (this.unsubscribeCompletionHandler != null) {
                this.unsubscribeCompletionHandler.handle(Integer.valueOf(i));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePuback(int i) {
        synchronized (this.connection) {
            if (this.qos1outbound.remove(Integer.valueOf(i)) == null) {
                log.warn("Received PUBACK packet without having related PUBLISH packet in storage");
                return;
            }
            this.countInflightQueue--;
            if (this.publishCompletionHandler != null) {
                this.publishCompletionHandler.handle(Integer.valueOf(i));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubcomp(int i) {
        synchronized (this.connection) {
            if (this.qos2outbound.remove(Integer.valueOf(i)) == null) {
                log.warn("Received PUBCOMP packet without having related PUBREL packet in storage");
                return;
            }
            this.countInflightQueue--;
            if (this.publishCompletionHandler != null) {
                this.publishCompletionHandler.handle(Integer.valueOf(i));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubrec(int i) {
        synchronized (this.connection) {
            publishRelease(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleSuback(MqttSubAckMessage mqttSubAckMessage) {
        synchronized (this.connection) {
            if (this.subscribeCompletionHandler != null) {
                this.subscribeCompletionHandler.handle(mqttSubAckMessage);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePublish(MqttPublishMessage mqttPublishMessage) {
        synchronized (this.connection) {
            switch (AnonymousClass2.$SwitchMap$io$netty$handler$codec$mqtt$MqttQoS[mqttPublishMessage.qosLevel().ordinal()]) {
                case 1:
                    publishAcknowledge(mqttPublishMessage.messageId());
                    if (this.publishHandler != null) {
                        this.publishHandler.handle(mqttPublishMessage);
                        break;
                    }
                    break;
                case MqttException.MQTT_INFLIGHT_QUEUE_FULL /* 2 */:
                    publishReceived(mqttPublishMessage);
                    break;
                case 3:
                    if (this.publishHandler != null) {
                        this.publishHandler.handle(mqttPublishMessage);
                        break;
                    }
                    break;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handlePubrel(int i) {
        synchronized (this.connection) {
            io.vertx.mqtt.messages.MqttMessage mqttMessage = this.qos2inbound.get(Integer.valueOf(i));
            if (mqttMessage == null) {
                log.warn("Received PUBREL packet without having related PUBREC packet in storage");
                return;
            }
            if (this.publishHandler != null) {
                this.publishHandler.handle((MqttPublishMessage) mqttMessage);
            }
            publishComplete(i);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void handleConnack(MqttConnAckMessage mqttConnAckMessage) {
        synchronized (this.connection) {
            this.isConnected = mqttConnAckMessage.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED;
            if (this.connectHandler != null) {
                if (mqttConnAckMessage.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
                    this.connectHandler.handle(Future.succeededFuture(mqttConnAckMessage));
                } else {
                    MqttConnectionException mqttConnectionException = new MqttConnectionException(mqttConnAckMessage.code());
                    log.error(String.format("Connection refused by the server - code: %s", mqttConnAckMessage.code()));
                    this.connectHandler.handle(Future.failedFuture(mqttConnectionException));
                }
            }
        }
    }

    void handleException(Throwable th) {
        synchronized (this.connection) {
            if (this.exceptionHandler != null) {
                this.exceptionHandler.handle(th);
            }
        }
    }

    private String generateRandomClientId() {
        return UUID.randomUUID().toString();
    }

    private boolean isValidTopicName(String str) {
        if (isValidStringSizeInUTF8(str)) {
            return this.validTopicNamePattern.matcher(str).find();
        }
        return false;
    }

    private boolean isValidTopicFilter(String str) {
        if (isValidStringSizeInUTF8(str)) {
            return this.validTopicFilterPattern.matcher(str).find();
        }
        return false;
    }

    private boolean isValidStringSizeInUTF8(String str) {
        try {
            int length = str.getBytes("UTF-8").length;
            return length >= 1 && length <= 65535;
        } catch (UnsupportedEncodingException e) {
            log.error("UTF-8 charset is not supported", e);
            return false;
        }
    }

    private void cleanup() {
        this.isConnected = false;
    }
}
