/*
 * Decompiled with CFR 0.152.
 */
package org.openremote.agent.protocol.mqtt;

import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttWebSocketConfigBuilder;
import com.hivemq.client.mqtt.exceptions.ConnectionClosedException;
import com.hivemq.client.mqtt.exceptions.ConnectionFailedException;
import com.hivemq.client.mqtt.lifecycle.MqttClientAutoReconnectBuilder;
import com.hivemq.client.mqtt.lifecycle.MqttDisconnectSource;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3ClientBuilder;
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException;
import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3DisconnectException;
import com.hivemq.client.mqtt.mqtt3.lifecycle.Mqtt3ClientDisconnectedContext;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuthBuilder;
import com.hivemq.client.mqtt.mqtt3.message.connect.Mqtt3ConnectBuilder;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3PublishBuilder;
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAck;
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3UnsubscribeBuilder;
import java.net.URI;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.openremote.agent.protocol.io.AbstractNettyIOClient;
import org.openremote.agent.protocol.io.IOClient;
import org.openremote.agent.protocol.mqtt.MQTTMessage;
import org.openremote.container.Container;
import org.openremote.container.util.UniqueIdentifierGenerator;
import org.openremote.model.asset.agent.ConnectionStatus;
import org.openremote.model.auth.UsernamePassword;
import org.openremote.model.syslog.SyslogCategory;

public abstract class AbstractMQTT_IOClient<S>
implements IOClient<MQTTMessage<S>> {
    public static final Logger LOG = SyslogCategory.getLogger((SyslogCategory)SyslogCategory.PROTOCOL, AbstractMQTT_IOClient.class);
    protected String clientId;
    protected String host;
    protected int port;
    protected boolean secure;
    protected boolean cleanSession;
    protected UsernamePassword usernamePassword;
    protected URI websocketURI;
    protected Mqtt3AsyncClient client;
    protected final Set<Consumer<ConnectionStatus>> connectionStatusConsumers = new CopyOnWriteArraySet<Consumer<ConnectionStatus>>();
    protected final Map<String, Set<Consumer<MQTTMessage<S>>>> topicConsumerMap = new HashMap<String, Set<Consumer<MQTTMessage<S>>>>();
    protected ScheduledExecutorService executorService;
    protected ConnectionStatus connectionStatus = ConnectionStatus.DISCONNECTED;
    protected boolean disconnected = true;
    protected Consumer<String> topicSubscribeFailureConsumer;

    protected AbstractMQTT_IOClient(String host, int port, boolean secure, boolean cleanSession, UsernamePassword usernamePassword, URI websocketURI) {
        this(UniqueIdentifierGenerator.generateId(), host, port, secure, cleanSession, usernamePassword, websocketURI);
    }

    protected AbstractMQTT_IOClient(String clientId, String host, int port, boolean secure, boolean cleanSession, UsernamePassword usernamePassword, URI websocketURI) {
        this.clientId = clientId;
        this.host = host;
        this.port = port;
        this.secure = secure;
        this.cleanSession = cleanSession;
        this.usernamePassword = usernamePassword;
        this.websocketURI = websocketURI;
        this.executorService = Container.EXECUTOR_SERVICE;
        Mqtt3ClientBuilder builder = (Mqtt3ClientBuilder)((MqttClientAutoReconnectBuilder.Nested)((MqttClientAutoReconnectBuilder.Nested)((Mqtt3ClientBuilder)((Mqtt3ClientBuilder)((Mqtt3ClientBuilder)MqttClient.builder().useMqttVersion3().identifier(clientId)).addConnectedListener(context -> {
            LOG.info("Client is connected to the broker '" + this.getClientUri() + "'");
            this.onConnectionStatusChanged(ConnectionStatus.CONNECTED);
        })).addDisconnectedListener(context -> {
            boolean userClosed;
            boolean bl = userClosed = context.getSource() == MqttDisconnectSource.USER;
            if (this.disconnected) {
                context.getReconnector().reconnect(false);
            } else if (this.usernamePassword != null) {
                ((Mqtt3ConnectBuilder.Nested)((Mqtt3SimpleAuthBuilder.Nested.Complete)((Mqtt3SimpleAuthBuilder.Nested.Complete)((Mqtt3ClientDisconnectedContext)context).getReconnector().connectWith().simpleAuth().username(usernamePassword.getUsername())).password(usernamePassword.getPassword().getBytes())).applySimpleAuth()).applyConnect();
            }
            if (context.getCause() instanceof Mqtt3DisconnectException) {
                LOG.info("Client disconnect '" + this.getClientUri() + "': initiator=" + context.getSource());
            } else if (context.getCause() instanceof Mqtt3ConnAckException) {
                LOG.info("Connection rejected by the broker '" + this.getClientUri() + "': reasonCode=" + ((Mqtt3ConnAckException)context.getCause()).getMqttMessage().getReturnCode() + ", initiator=" + context.getSource());
            } else if (context.getCause() instanceof ConnectionClosedException) {
                LOG.info("Connection closed by " + context.getSource() + " '" + this.getClientUri() + "': initiator=" + context.getSource());
            } else if (context.getCause() instanceof ConnectionFailedException) {
                LOG.log(Level.INFO, "Connection failed '" + this.getClientUri() + "': initiator=" + context.getSource(), context.getCause());
            }
            this.onConnectionStatusChanged(userClosed ? ConnectionStatus.DISCONNECTED : ConnectionStatus.WAITING);
        })).automaticReconnect().initialDelay(AbstractNettyIOClient.RECONNECT_DELAY_INITIAL_MILLIS, TimeUnit.MILLISECONDS)).maxDelay(AbstractNettyIOClient.RECONNECT_DELAY_MAX_MILLIS, TimeUnit.MILLISECONDS)).applyAutomaticReconnect();
        if (secure) {
            builder = (Mqtt3ClientBuilder)builder.sslWithDefaultConfig();
        }
        builder = websocketURI != null ? (Mqtt3ClientBuilder)((MqttWebSocketConfigBuilder.Nested)((MqttWebSocketConfigBuilder.Nested)((Mqtt3ClientBuilder)((Mqtt3ClientBuilder)builder.serverHost(websocketURI.getHost())).serverPort(websocketURI.getPort())).webSocketConfig().serverPath(websocketURI.getPath())).queryString(websocketURI.getQuery())).applyWebSocketConfig() : (Mqtt3ClientBuilder)((Mqtt3ClientBuilder)builder.serverHost(host)).serverPort(port);
        try {
            this.client = builder.buildAsync();
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Invalid MQTT client config for client '" + this.getClientUri() + "'", e);
            this.client = null;
            this.connectionStatus = ConnectionStatus.ERROR;
        }
    }

    @Override
    public void sendMessage(MQTTMessage<S> message) {
        if (this.client == null) {
            LOG.info("Cannot send message as client is invalid  '" + this.getClientUri() + "'");
            return;
        }
        ((CompletableFuture)((Mqtt3PublishBuilder.Send.Complete)((Mqtt3PublishBuilder.Send.Complete)this.client.publishWith().topic(message.topic)).payload(this.messageToBytes(message.payload))).send()).whenComplete((publish, throwable) -> {
            if (throwable != null) {
                LOG.log(Level.INFO, "Failed to publish to MQTT broker '" + this.getClientUri() + "'", (Throwable)throwable);
            } else {
                LOG.finer("Published message to MQTT broker '" + this.getClientUri() + "'");
            }
        });
    }

    @Override
    public void addMessageConsumer(Consumer<MQTTMessage<S>> messageConsumer) {
        this.addMessageConsumer("#", messageConsumer);
    }

    public synchronized boolean addMessageConsumer(String topic, Consumer<MQTTMessage<S>> messageConsumer) {
        if (this.client == null) {
            return false;
        }
        Set consumers = this.topicConsumerMap.computeIfAbsent(topic, t -> new CopyOnWriteArraySet());
        if (consumers.isEmpty()) {
            if (this.doClientSubscription(topic, consumers)) {
                consumers.add(messageConsumer);
                return true;
            }
            this.topicConsumerMap.remove(topic);
            return false;
        }
        consumers.add(messageConsumer);
        return true;
    }

    public void setTopicSubscribeFailureConsumer(Consumer<String> topicSubscribeFailureConsumer) {
        this.topicSubscribeFailureConsumer = topicSubscribeFailureConsumer;
    }

    protected void onSubscribeFailed(String topic) {
        if (this.topicSubscribeFailureConsumer != null) {
            this.topicSubscribeFailureConsumer.accept(topic);
        }
    }

    protected synchronized boolean doClientSubscription(String topic, Set<Consumer<MQTTMessage<S>>> consumers) {
        Consumer<MQTTMessage> messageConsumer = message -> {
            if (!this.topicConsumerMap.containsKey(topic)) {
                return;
            }
            consumers.forEach(consumer -> {
                try {
                    consumer.accept(message);
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Message consumer threw an exception", e);
                }
            });
        };
        if (this.connectionStatus != ConnectionStatus.CONNECTED) {
            return true;
        }
        try {
            Mqtt3SubAck subAck = (Mqtt3SubAck)((Mqtt3AsyncClient.Mqtt3SubscribeAndCallbackBuilder.Start.Complete)this.client.subscribeWith().topicFilter(topic)).callback(publish -> {
                try {
                    String topicStr = publish.getTopic().toString();
                    S payload = this.messageFromBytes(publish.getPayloadAsBytes());
                    MQTTMessage<S> message = new MQTTMessage<S>(topicStr, payload);
                    messageConsumer.accept(message);
                }
                catch (Exception e) {
                    LOG.log(Level.WARNING, "Failed to process published message on client '" + this.getClientUri() + "'", e);
                }
            }).send().get();
            LOG.fine("Subscribed to topic '" + topic + "' on client '" + this.getClientUri() + "'");
            return true;
        }
        catch (Exception e) {
            LOG.log(Level.WARNING, "Failed to subscribe to topic '" + topic + "' on client '" + this.getClientUri() + "'", e);
            this.executorService.execute(() -> this.onSubscribeFailed(topic));
            return false;
        }
    }

    @Override
    public void removeMessageConsumer(Consumer<MQTTMessage<S>> messageConsumer) {
        this.removeMessageConsumer("#", messageConsumer);
    }

    public synchronized void removeMessageConsumer(String topic, Consumer<MQTTMessage<S>> messageConsumer) {
        this.topicConsumerMap.computeIfPresent(topic, (t, consumers) -> {
            if (consumers.remove(messageConsumer) && consumers.isEmpty()) {
                this.removeSubscription(topic);
                return null;
            }
            return consumers;
        });
    }

    @Override
    public synchronized void removeAllMessageConsumers() {
        HashSet<String> topics = new HashSet<String>(this.topicConsumerMap.keySet());
        this.topicConsumerMap.clear();
        topics.forEach(this::removeSubscription);
    }

    protected void removeSubscription(String topic) {
        if (this.client != null) {
            ((CompletableFuture)((Mqtt3UnsubscribeBuilder.Send.Complete)this.client.unsubscribeWith().topicFilter(topic)).send()).whenComplete((subAck, throwable) -> {
                if (throwable != null) {
                    LOG.log(Level.WARNING, "Failed to unsubscribe to topic '" + topic + "' on client '" + this.getClientUri() + "'", (Throwable)throwable);
                } else {
                    LOG.fine("Unsubscribed from topic '" + topic + "' on client '" + this.getClientUri() + "'");
                }
            });
        }
    }

    @Override
    public void addConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        this.connectionStatusConsumers.add(connectionStatusConsumer);
    }

    @Override
    public void removeConnectionStatusConsumer(Consumer<ConnectionStatus> connectionStatusConsumer) {
        this.connectionStatusConsumers.remove(connectionStatusConsumer);
    }

    @Override
    public void removeAllConnectionStatusConsumers() {
        this.connectionStatusConsumers.clear();
    }

    @Override
    public ConnectionStatus getConnectionStatus() {
        return this.connectionStatus;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void connect() {
        AbstractMQTT_IOClient abstractMQTT_IOClient = this;
        synchronized (abstractMQTT_IOClient) {
            if (this.getConnectionStatus() != ConnectionStatus.DISCONNECTED) {
                LOG.finer("Must be disconnected and not in error before calling connect: " + this.getClientUri());
                return;
            }
            LOG.fine("Connecting MQTT Client: " + this.getClientUri());
            this.onConnectionStatusChanged(ConnectionStatus.CONNECTING);
        }
        this.disconnected = false;
        LOG.info("Establishing connection: " + this.getClientUri());
        Mqtt3ConnectBuilder.Send completableFutureSend = (Mqtt3ConnectBuilder.Send)((Mqtt3ConnectBuilder.Send)this.client.connectWith().cleanSession(true)).keepAlive(5);
        if (this.usernamePassword != null) {
            completableFutureSend = (Mqtt3ConnectBuilder.Send)((Mqtt3SimpleAuthBuilder.Nested.Complete)((Mqtt3SimpleAuthBuilder.Nested.Complete)completableFutureSend.simpleAuth().username(this.usernamePassword.getUsername())).password(this.usernamePassword.getPassword().getBytes())).applySimpleAuth();
        }
        ((CompletableFuture)completableFutureSend.send()).whenComplete((connAck, throwable) -> {
            if (throwable != null) {
                LOG.log(Level.INFO, "Connection failed:" + this.getClientUri(), (Throwable)throwable);
            } else if (!this.cleanSession && !connAck.isSessionPresent()) {
                this.executorService.execute(() -> {
                    AbstractMQTT_IOClient abstractMQTT_IOClient = this;
                    synchronized (abstractMQTT_IOClient) {
                        new HashMap<String, Set<Consumer<MQTTMessage<S>>>>(this.topicConsumerMap).forEach((topic, consumers) -> {
                            if (!this.doClientSubscription((String)topic, (Set<Consumer<MQTTMessage<S>>>)consumers)) {
                                this.topicConsumerMap.remove(topic);
                            }
                        });
                    }
                });
            }
        });
    }

    protected void onConnectionStatusChanged(ConnectionStatus connectionStatus) {
        if (this.connectionStatus == connectionStatus) {
            return;
        }
        this.connectionStatus = connectionStatus;
        this.executorService.submit(() -> this.connectionStatusConsumers.forEach(consumer -> {
            try {
                consumer.accept(connectionStatus);
            }
            catch (Exception e) {
                LOG.log(Level.WARNING, "Connection status change handler threw an exception: " + this.getClientUri(), e);
            }
        }));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void disconnect() {
        if (this.client == null) {
            return;
        }
        AbstractMQTT_IOClient abstractMQTT_IOClient = this;
        synchronized (abstractMQTT_IOClient) {
            if (this.connectionStatus == ConnectionStatus.DISCONNECTED) {
                LOG.finest("Already disconnected: " + this.getClientUri());
                return;
            }
            LOG.finest("Disconnecting IO client: " + this.getClientUri());
            this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTING);
        }
        this.disconnected = true;
        this.client.disconnect().whenComplete((unused, throwable) -> {
            this.onConnectionStatusChanged(ConnectionStatus.DISCONNECTED);
            if (this.cleanSession) {
                this.removeAllMessageConsumers();
            }
            if (throwable != null) {
                LOG.info("Failed to disconnect");
            }
        });
    }

    @Override
    public String getClientUri() {
        if (this.websocketURI != null) {
            return "mqtt_" + this.websocketURI + "?clientId=" + this.clientId;
        }
        return "mqtt" + (this.secure ? "s://" : "://") + this.host + ":" + this.port + "/?clientId=" + this.clientId;
    }

    public abstract byte[] messageToBytes(S var1);

    public abstract S messageFromBytes(byte[] var1);
}

