package org.streampipes.manager.runtime;

import com.google.common.base.Charsets;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.config.backend.BackendConfig;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.jms.ActiveMQConsumer;
import org.streampipes.model.SpDataStream;
import org.streampipes.model.grounding.KafkaTransportProtocol;

/* loaded from: input_file:org/streampipes/manager/runtime/PipelineElementRuntimeInfoFetcher.class */
public class PipelineElementRuntimeInfoFetcher {
    Logger logger = LoggerFactory.getLogger(JsonParser.class);
    private SpDataStream spDataStream;
    private static Set<String> consumerInstances = new HashSet();
    private static final String CONSUMER_GROUP_ID = "streampipes-backend-listener-group-";
    private static final String KAFKA_REST_CONTENT_TYPE = "application/vnd.kafka.v2+json";
    private static final String KAFKA_REST_SUBSCRIPTION_CONTENT_TYPE = "application/vnd.kafka.json.v2+json";
    private static final String OFFSET_FIELD_NAME = "offset";
    private static final String VALUE_FIELD_NAME = "value";

    public PipelineElementRuntimeInfoFetcher(SpDataStream spDataStream) {
        this.spDataStream = spDataStream;
    }

    public String getCurrentData() throws SpRuntimeException {
        return this.spDataStream.getEventGrounding().getTransportProtocol() instanceof KafkaTransportProtocol ? getLatestEventFromKafka() : getLatestEventFromJms();
    }

    private String getLatestEventFromJms() throws SpRuntimeException {
        final String[] strArr = {null};
        final ActiveMQConsumer activeMQConsumer = new ActiveMQConsumer();
        activeMQConsumer.connect(this.spDataStream.getEventGrounding().getTransportProtocol(), new InternalEventProcessor<byte[]>() { // from class: org.streampipes.manager.runtime.PipelineElementRuntimeInfoFetcher.1
            public void onEvent(byte[] bArr) {
                strArr[0] = new String(bArr);
                try {
                    activeMQConsumer.disconnect();
                } catch (SpRuntimeException e) {
                    e.printStackTrace();
                }
            }
        });
        while (strArr[0] == null) {
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return strArr[0];
    }

    private String getLatestEventFromKafka() throws SpRuntimeException {
        return getLatestSubscription(getKafkaRestUrl(), getOutputTopic());
    }

    private String getLatestSubscription(String str, String str2) throws SpRuntimeException {
        String str3 = getConsumerInstanceUrl(str, getConsumerInstanceId(str2), str2) + "/records";
        try {
            if (!consumerInstances.contains(getConsumerInstanceId(str2))) {
                createSubscription(str, str2);
                consumerInstances.add(getConsumerInstanceId(str2));
            }
            return extractPayload(Request.Get(str3).addHeader("Accept", KAFKA_REST_SUBSCRIPTION_CONTENT_TYPE).execute().returnContent().asString());
        } catch (IOException | SpRuntimeException e) {
            if (!e.getMessage().equals("")) {
                this.logger.error("Could not get any sample data from Kafka", e);
            }
            consumerInstances.remove(getConsumerInstanceId(str2));
            throw new SpRuntimeException(e.getMessage());
        }
    }

    private void createSubscription(String str, String str2) throws IOException, SpRuntimeException {
        String consumerInstanceId = getConsumerInstanceId(str2);
        createConsumer(str, consumerInstanceId, str2);
        if (subscribeConsumer(str, consumerInstanceId, str2).intValue() != 204) {
            throw new SpRuntimeException("Could not read message form Kafka-REST: " + str);
        }
    }

    private Integer subscribeConsumer(String str, String str2, String str3) throws IOException {
        return Integer.valueOf(Request.Post(getConsumerInstanceUrl(str, str2, str3) + "/subscription").addHeader("Content-Type", KAFKA_REST_CONTENT_TYPE).addHeader("Accept", KAFKA_REST_CONTENT_TYPE).body(new StringEntity(makeSubscribeConsumerBody(str3), Charsets.UTF_8)).execute().returnResponse().getStatusLine().getStatusCode());
    }

    private String getConsumerInstanceUrl(String str, String str2, String str3) {
        return str + "/consumers/" + getConsumerGroupId(str3) + "/instances/" + str2;
    }

    private String getConsumerGroupId(String str) {
        return CONSUMER_GROUP_ID + str;
    }

    private String makeSubscribeConsumerBody(String str) {
        return "{\"topics\":[\"" + str + "\"]}";
    }

    private Integer createConsumer(String str, String str2, String str3) throws IOException {
        return Integer.valueOf(Request.Post(str + "/consumers/" + getConsumerGroupId(str3)).addHeader("Content-Type", KAFKA_REST_CONTENT_TYPE).addHeader("Accept", KAFKA_REST_CONTENT_TYPE).body(new StringEntity(makeCreateConsumerBody(str2), Charsets.UTF_8)).execute().returnResponse().getStatusLine().getStatusCode());
    }

    private String makeCreateConsumerBody(String str) {
        return "{\"name\": \"" + str + "\", \"format\": \"json\", \"auto.offset.reset\": \"latest\"}";
    }

    private String getConsumerInstanceId(String str) {
        return "streampipes-backend-listener-group--" + str;
    }

    private String getOutputTopic() {
        return this.spDataStream.getEventGrounding().getTransportProtocol().getTopicDefinition().getActualTopicName();
    }

    private String getKafkaRestUrl() {
        return BackendConfig.INSTANCE.getKafkaRestUrl();
    }

    private String extractPayload(String str) {
        JsonElement parse = new JsonParser().parse(str);
        if (!parse.isJsonArray()) {
            return "{}";
        }
        JsonArray asJsonArray = parse.getAsJsonArray();
        if (asJsonArray.size() <= 0) {
            return "{}";
        }
        JsonObject asJsonObject = asJsonArray.get(0).getAsJsonObject();
        Long valueOf = Long.valueOf(asJsonObject.get(OFFSET_FIELD_NAME).getAsLong());
        for (int i = 1; i < asJsonArray.size(); i++) {
            JsonObject asJsonObject2 = asJsonArray.get(i).getAsJsonObject();
            if (Long.valueOf(asJsonObject2.get(OFFSET_FIELD_NAME).getAsLong()).longValue() > valueOf.longValue()) {
                asJsonObject = asJsonObject2;
            }
        }
        return asJsonObject.get(VALUE_FIELD_NAME).getAsJsonObject().toString();
    }
}
