/*
 * Decompiled with CFR 0.152.
 */
package net.randomsync.robotframework.mqtt;

import java.util.ArrayList;
import java.util.List;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.robotframework.javalib.annotation.ArgumentNames;
import org.robotframework.javalib.annotation.RobotKeyword;
import org.robotframework.javalib.annotation.RobotKeywordOverload;
import org.robotframework.javalib.annotation.RobotKeywords;
import org.robotframework.javalib.library.AnnotationLibrary;

@RobotKeywords
public class MQTTLibrary
extends AnnotationLibrary {
    public static final String KEYWORD_PATTERN = "net/randomsync/robotframework/mqtt/**/*.class";
    public static final String ROBOT_LIBRARY_SCOPE = "GLOBAL";
    public static final String ROBOT_LIBRARY_VERSION = "1.0.0";
    public static final String ROBOT_LIBRARY_DOC_FORMAT = "HTML";
    private MqttClient client;

    public MQTTLibrary() {
        super(KEYWORD_PATTERN);
    }

    @RobotKeyword(value="Connect to MQTT Broker")
    @ArgumentNames(value={"broker", "clientId"})
    public void connectToMQTTBroker(String broker, String clientId) throws MqttException {
        this.client = new MqttClient(broker, clientId);
        System.out.println("*INFO:" + System.currentTimeMillis() + "* connecting to broker");
        this.client.connect();
        System.out.println("*INFO:" + System.currentTimeMillis() + "* connected");
    }

    @RobotKeywordOverload
    @ArgumentNames(value={"topic", "message"})
    public void publishToMQTTSynchronously(String topic, Object message) throws MqttException {
        this.publishToMQTTSynchronously(topic, message, 0, false);
    }

    @RobotKeyword(value="Publish to MQTT Synchronously")
    @ArgumentNames(value={"topic", "message", "qos=0", "retained=false"})
    public void publishToMQTTSynchronously(String topic, Object message, int qos, boolean retained) throws MqttException {
        MqttMessage msg = message instanceof String ? new MqttMessage(message.toString().getBytes()) : new MqttMessage((byte[])message);
        msg.setQos(qos);
        msg.setRetained(retained);
        System.out.println("*INFO:" + System.currentTimeMillis() + "* publishing message");
        this.client.publish(topic, msg);
        System.out.println("*INFO:" + System.currentTimeMillis() + "* published");
    }

    @RobotKeyword(value="Disconnect from MQTT Broker")
    public void disconnectFromMQTTBroker() {
        if (this.client != null) {
            try {
                this.client.disconnect();
            }
            catch (MqttException e) {
                throw new RuntimeException(e.getLocalizedMessage());
            }
        }
    }

    @RobotKeyword(value="Subscribe to MQTT Broker and validate that it received a specific message")
    @ArgumentNames(value={"broker", "clientId", "topic", "expectedPayload", "timeout"})
    public void subscribeToMQTTAndValidate(String broker, String clientId, String topic, String expectedPayload, long timeout) {
        MqttClient client = null;
        try {
            boolean validated;
            block11: {
                MemoryPersistence persistence = new MemoryPersistence();
                client = new MqttClient(broker, clientId, (MqttClientPersistence)persistence);
                MqttConnectOptions connOpts = new MqttConnectOptions();
                connOpts.setCleanSession(false);
                MQTTResponseHandler handler = new MQTTResponseHandler();
                client.setCallback((MqttCallback)handler);
                System.out.println("*INFO:" + System.currentTimeMillis() + "* Connecting to broker: " + broker);
                client.connect(connOpts);
                System.out.println("*INFO:" + System.currentTimeMillis() + "* Subscribing to topic: " + topic);
                client.subscribe(topic);
                System.out.println("*INFO:" + System.currentTimeMillis() + "* Subscribed to topic: " + topic);
                System.out.println("*INFO:" + System.currentTimeMillis() + "* Waiting for message to arrive");
                validated = false;
                long endTime = System.currentTimeMillis() + timeout;
                do {
                    MqttMessage message;
                    if ((message = handler.getNextMessage(timeout)) == null) continue;
                    byte[] payload = message.getPayload();
                    String payloadStr = new String(payload);
                    if (!expectedPayload.isEmpty() && !payloadStr.matches(expectedPayload)) continue;
                    validated = true;
                    break block11;
                } while ((timeout = endTime - System.currentTimeMillis()) > 0L);
                System.out.println("*DEBUG:" + System.currentTimeMillis() + "* timeout: " + timeout);
            }
            if (!validated) {
                throw new RuntimeException("The expected payload didn't arrive in the topic");
            }
        }
        catch (MqttException e) {
            throw new RuntimeException(e.getLocalizedMessage());
        }
        finally {
            try {
                client.disconnect();
            }
            catch (MqttException e) {}
        }
    }

    public String getKeywordDocumentation(String keywordName) {
        if (keywordName.equals("__intro__")) {
            return "Keyword Library for MQTT";
        }
        if (keywordName.equals("__init__")) {
            return "MQTT Library can be imported directly";
        }
        return super.getKeywordDocumentation(keywordName);
    }

    class MQTTResponseHandler
    implements MqttCallback {
        List<MqttMessage> messages = new ArrayList<MqttMessage>();

        MQTTResponseHandler() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public MqttMessage getNextMessage(long timeout) {
            List<MqttMessage> list = this.messages;
            synchronized (list) {
                if (this.messages.size() == 0) {
                    try {
                        this.messages.wait(timeout);
                    }
                    catch (InterruptedException e) {
                        System.out.println("*ERROR:" + System.currentTimeMillis() + "* " + e.getLocalizedMessage());
                    }
                }
            }
            if (this.messages.size() == 0) {
                return null;
            }
            return this.messages.remove(0);
        }

        public void connectionLost(Throwable cause) {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            System.out.println("*INFO:" + System.currentTimeMillis() + "* Message arrived");
            List<MqttMessage> list = this.messages;
            synchronized (list) {
                this.messages.add(message);
                this.messages.notifyAll();
            }
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }
}

