package org.streampipes.connect.adapter.generic.protocol.stream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.streampipes.connect.SendToPipeline;
import org.streampipes.connect.adapter.generic.format.Format;
import org.streampipes.connect.adapter.generic.format.Parser;
import org.streampipes.connect.adapter.generic.pipeline.AdapterPipeline;
import org.streampipes.connect.adapter.generic.protocol.Protocol;
import org.streampipes.connect.adapter.generic.sdk.ParameterExtractor;
import org.streampipes.connect.exception.ParseException;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.model.AdapterType;
import org.streampipes.model.connect.grounding.ProtocolDescription;
import org.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
import org.streampipes.sdk.helpers.AdapterSourceType;
import org.streampipes.sdk.helpers.Labels;

/* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/MqttProtocol.class */
public class MqttProtocol extends BrokerProtocol {
    public static final String ID = "https://streampipes.org/vocabulary/v1/protocol/stream/mqtt";
    private Thread thread;
    private MqttConsumer mqttConsumer;

    /* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/MqttProtocol$EventProcessor.class */
    private class EventProcessor implements InternalEventProcessor<byte[]> {
        private SendToPipeline stk;

        public EventProcessor(SendToPipeline sendToPipeline) {
            this.stk = sendToPipeline;
        }

        @Override // org.streampipes.messaging.InternalEventProcessor
        public void onEvent(byte[] bArr) {
            try {
                MqttProtocol.this.parser.parse(IOUtils.toInputStream(new String(bArr), "UTF-8"), this.stk);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (ParseException e2) {
                e2.printStackTrace();
            }
        }
    }

    public MqttProtocol() {
    }

    public MqttProtocol(Parser parser, Format format, String str, String str2) {
        super(parser, format, str, str2);
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
        ParameterExtractor parameterExtractor = new ParameterExtractor(protocolDescription.getConfig());
        return new MqttProtocol(parser, format, parameterExtractor.singleValue("broker_url"), parameterExtractor.singleValue("topic"));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public ProtocolDescription declareModel() {
        return (ProtocolDescription) ((ProtocolDescriptionBuilder) ProtocolDescriptionBuilder.create(ID, "MQTT", "Consumes messages from a broker using the MQTT protocol").iconUrl("mqtt.png")).category(AdapterType.Generic, AdapterType.Manufacturing).sourceType(AdapterSourceType.STREAM).requiredTextParameter(Labels.from("broker_url", "Broker URL", "Example: tcp://test-server.com:1883 (Protocol required. Port required)")).requiredTextParameter(Labels.from("topic", "Topic", "Example: test/topic")).build();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.stream.BrokerProtocol
    protected List<byte[]> getNByteElements(int i) throws ParseException {
        ArrayList arrayList = new ArrayList();
        arrayList.getClass();
        MqttConsumer mqttConsumer = new MqttConsumer(this.brokerUrl, this.topic, (v1) -> {
            r0.add(v1);
        });
        new Thread(mqttConsumer).start();
        while (mqttConsumer.getMessageCount().intValue() < i) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return arrayList;
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void run(AdapterPipeline adapterPipeline) {
        this.mqttConsumer = new MqttConsumer(this.brokerUrl, this.topic, new EventProcessor(new SendToPipeline(this.format, adapterPipeline)));
        this.thread = new Thread(this.mqttConsumer);
        this.thread.start();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void stop() {
        this.mqttConsumer.close();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public String getId() {
        return ID;
    }
}
