package org.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.commons.exceptions.SpRuntimeException;
import org.streampipes.messaging.EventConsumer;
import org.streampipes.messaging.InternalEventProcessor;
import org.streampipes.messaging.kafka.config.ConsumerConfigFactory;
import org.streampipes.model.grounding.KafkaTransportProtocol;
import org.streampipes.model.grounding.SimpleTopicDefinition;
import org.streampipes.model.grounding.WildcardTopicDefinition;

/* loaded from: input_file:BOOT-INF/lib/streampipes-messaging-kafka-0.63.0.jar:org/streampipes/messaging/kafka/SpKafkaConsumer.class */
public class SpKafkaConsumer implements EventConsumer<KafkaTransportProtocol>, Runnable, Serializable {
    private String topic;
    private InternalEventProcessor<byte[]> eventProcessor;
    private KafkaTransportProtocol protocol;
    private volatile boolean isRunning;
    private Boolean patternTopic = false;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SpKafkaConsumer.class);

    public SpKafkaConsumer() {
    }

    public SpKafkaConsumer(String str, String str2, InternalEventProcessor<byte[]> internalEventProcessor) {
        KafkaTransportProtocol kafkaTransportProtocol = new KafkaTransportProtocol();
        kafkaTransportProtocol.setKafkaPort(Integer.parseInt(str.split(":")[1]));
        kafkaTransportProtocol.setBrokerHostname(str.split(":")[0]);
        kafkaTransportProtocol.setTopicDefinition(new SimpleTopicDefinition(str2));
        try {
            connect2(kafkaTransportProtocol, internalEventProcessor);
        } catch (SpRuntimeException e) {
            e.printStackTrace();
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(getProperties());
        if (this.patternTopic.booleanValue()) {
            this.topic = replaceWildcardWithPatternFormat(this.topic);
            kafkaConsumer.subscribe(Pattern.compile(this.topic), new ConsumerRebalanceListener() { // from class: org.streampipes.messaging.kafka.SpKafkaConsumer.1
                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                @Override // org.apache.kafka.clients.consumer.ConsumerRebalanceListener
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                }
            });
        } else {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        }
        while (this.isRunning) {
            Iterator it = kafkaConsumer.poll(100L).iterator();
            while (it.hasNext()) {
                this.eventProcessor.onEvent(((ConsumerRecord) it.next()).value());
            }
        }
        LOG.info("Closing Kafka Consumer.");
        kafkaConsumer.close();
    }

    private String replaceWildcardWithPatternFormat(String str) {
        return str.replaceAll("\\.", "\\\\.").replaceAll("\\*", ".*");
    }

    private Properties getProperties() {
        return new ConsumerConfigFactory(this.protocol).makeProperties();
    }

    /* renamed from: connect, reason: avoid collision after fix types in other method */
    public void connect2(KafkaTransportProtocol kafkaTransportProtocol, InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        LOG.info("Kafka consumer: Connecting to " + kafkaTransportProtocol.getTopicDefinition().getActualTopicName());
        if (kafkaTransportProtocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
            this.patternTopic = true;
        }
        this.eventProcessor = internalEventProcessor;
        this.protocol = kafkaTransportProtocol;
        this.topic = kafkaTransportProtocol.getTopicDefinition().getActualTopicName();
        this.isRunning = true;
        new Thread(this).start();
    }

    @Override // org.streampipes.messaging.EventConsumer
    public void disconnect() throws SpRuntimeException {
        LOG.info("Kafka consumer: Disconnecting from " + this.topic);
        this.isRunning = false;
    }

    @Override // org.streampipes.messaging.EventConsumer
    public Boolean isConnected() {
        return Boolean.valueOf(this.isRunning);
    }

    @Override // org.streampipes.messaging.EventConsumer
    public /* bridge */ /* synthetic */ void connect(KafkaTransportProtocol kafkaTransportProtocol, InternalEventProcessor internalEventProcessor) throws SpRuntimeException {
        connect2(kafkaTransportProtocol, (InternalEventProcessor<byte[]>) internalEventProcessor);
    }
}
