package org.streampipes.messaging.kafka;

import java.io.Serializable;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.messaging.EventProducer;
import org.streampipes.model.grounding.KafkaTransportProtocol;

/* loaded from: input_file:org/streampipes/messaging/kafka/SpKafkaProducer.class */
public class SpKafkaProducer implements EventProducer<KafkaTransportProtocol>, Serializable {
    private String brokerUrl;
    private String topic;
    private Producer<String, byte[]> producer;
    private Boolean connected;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaProducer.class);

    public SpKafkaProducer() {
    }

    public SpKafkaProducer(String str, String str2) {
        this.brokerUrl = str;
        this.topic = str2;
        this.producer = new KafkaProducer(getProperties());
    }

    public void publish(String str) {
        publish(str.getBytes());
    }

    public void publish(byte[] bArr) {
        this.producer.send(new ProducerRecord(this.topic, bArr));
    }

    private Properties getProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerUrl);
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("message.max.bytes", 5000012);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return properties;
    }

    public void connect(KafkaTransportProtocol kafkaTransportProtocol) {
        LOG.info("Kafka producer: Connecting to " + kafkaTransportProtocol.getTopicDefinition().getActualTopicName());
        this.brokerUrl = kafkaTransportProtocol.getBrokerHostname() + ":" + kafkaTransportProtocol.getKafkaPort();
        this.topic = kafkaTransportProtocol.getTopicDefinition().getActualTopicName();
        this.producer = new KafkaProducer(getProperties());
        this.connected = true;
    }

    public void disconnect() {
        LOG.info("Kafka producer: Disconnecting from " + this.topic);
        this.producer.close();
        this.connected = false;
    }

    public Boolean isConnected() {
        return Boolean.valueOf(this.connected != null && this.connected.booleanValue());
    }
}
