package org.streampipes.connect.adapter.generic.protocol.stream;

import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Message;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;
import org.streampipes.messaging.InternalEventProcessor;

/* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/MqttConsumer.class */
public class MqttConsumer implements Runnable {
    private String broker;
    private String topic;
    private InternalEventProcessor<byte[]> consumer;
    private boolean running;
    private int maxElementsToReceive;
    private int messageCount;
    private Boolean authenticatedConnection;
    private String username;
    private String password;

    public MqttConsumer(String str, String str2, InternalEventProcessor<byte[]> internalEventProcessor) {
        this.maxElementsToReceive = -1;
        this.messageCount = 0;
        this.broker = str;
        this.topic = str2;
        this.consumer = internalEventProcessor;
        this.authenticatedConnection = false;
    }

    public MqttConsumer(String str, String str2, InternalEventProcessor<byte[]> internalEventProcessor, int i) {
        this(str, str2, internalEventProcessor);
        this.maxElementsToReceive = i;
    }

    public MqttConsumer(String str, String str2, String str3, String str4, InternalEventProcessor<byte[]> internalEventProcessor) {
        this(str, str2, internalEventProcessor);
        this.username = str3;
        this.password = str4;
        this.authenticatedConnection = true;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.running = true;
        MQTT mqtt = new MQTT();
        try {
            mqtt.setHost(this.broker);
            if (this.authenticatedConnection.booleanValue()) {
                mqtt.setUserName(this.username);
                mqtt.setPassword(this.password);
            }
            BlockingConnection blockingConnection = mqtt.blockingConnection();
            blockingConnection.connect();
            blockingConnection.subscribe(new Topic[]{new Topic(this.topic, QoS.AT_LEAST_ONCE)});
            while (this.running && (this.maxElementsToReceive == -1 || this.messageCount <= this.maxElementsToReceive)) {
                Message receive = blockingConnection.receive();
                this.consumer.onEvent(receive.getPayload());
                receive.ack();
                this.messageCount++;
            }
            blockingConnection.disconnect();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void close() {
        this.running = false;
    }

    public Integer getMessageCount() {
        return Integer.valueOf(this.messageCount);
    }
}
