package io.reacted.drivers.channels.kafka;

import io.reacted.core.config.ChannelId;
import io.reacted.core.drivers.system.RemotingDriver;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.patterns.Try;
import io.reacted.patterns.UnChecked;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/reacted/drivers/channels/kafka/KafkaDriver.class */
public class KafkaDriver extends RemotingDriver<KafkaDriverConfig> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDriver.class);

    @Nullable
    private static final Message NO_VALID_PAYLOAD = null;

    @Nullable
    private static final byte[] NO_SERIALIZED_PAYLOAD = null;
    private static final Duration POLL_TIMEOUT = Duration.ofSeconds(1);

    @Nullable
    private Consumer<Long, Message> kafkaConsumer;

    @Nullable
    private Producer<Long, Message> kafkaProducer;

    /* loaded from: input_file:io/reacted/drivers/channels/kafka/KafkaDriver$MessageDecoder.class */
    public static class MessageDecoder implements Deserializer<Message> {
        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public Message m60deserialize(String str, byte[] bArr) {
            return (Message) Try.withResources(() -> {
                return new ObjectInputStream(new ByteArrayInputStream(bArr));
            }, (v0) -> {
                return v0.readObject();
            }).orElseGet(() -> {
                return KafkaDriver.NO_VALID_PAYLOAD;
            }, th -> {
                KafkaDriver.LOGGER.error("Unable to properly decode message", th);
            });
        }
    }

    /* loaded from: input_file:io/reacted/drivers/channels/kafka/KafkaDriver$MessageEncoder.class */
    public static class MessageEncoder implements Serializer<Message> {
        public byte[] serialize(String str, Message message) {
            return (byte[]) Try.withChainedResources(ByteArrayOutputStream::new, (v1) -> {
                return new ObjectOutputStream(v1);
            }, (byteArrayOutputStream, objectOutputStream) -> {
                objectOutputStream.writeObject(message);
                return byteArrayOutputStream.toByteArray();
            }).orElseGet(() -> {
                return KafkaDriver.NO_SERIALIZED_PAYLOAD;
            }, th -> {
                KafkaDriver.LOGGER.error("Unable to encode message", th);
            });
        }
    }

    public KafkaDriver(KafkaDriverConfig kafkaDriverConfig) {
        super(kafkaDriverConfig);
    }

    public void initDriverLoop(ReActorSystem reActorSystem) {
        this.kafkaConsumer = (Consumer) Objects.requireNonNull(createConsumer((KafkaDriverConfig) getDriverConfig()));
        this.kafkaProducer = (Producer) Objects.requireNonNull(createProducer((KafkaDriverConfig) getDriverConfig()));
    }

    public CompletionStage<Try<Void>> cleanDriverLoop() {
        return CompletableFuture.completedFuture(Try.ofRunnable(() -> {
            ((Producer) Objects.requireNonNull(this.kafkaProducer)).close();
            ((Consumer) Objects.requireNonNull(this.kafkaConsumer)).close();
        }));
    }

    public UnChecked.CheckedRunnable getDriverLoop() {
        return () -> {
            kafkaDriverLoop((Consumer) Objects.requireNonNull(this.kafkaConsumer), this, getLocalReActorSystem());
        };
    }

    public ChannelId getChannelId() {
        return ChannelId.ChannelType.KAFKA.forChannelName(((KafkaDriverConfig) getDriverConfig()).getChannelName());
    }

    public Properties getChannelProperties() {
        return ((KafkaDriverConfig) getDriverConfig()).getProperties();
    }

    public DeliveryStatus sendMessage(ReActorContext reActorContext, Message message) {
        try {
            ((Producer) Objects.requireNonNull(this.kafkaProducer)).send(new ProducerRecord(((KafkaDriverConfig) getDriverConfig()).getTopic(), message)).get();
            return DeliveryStatus.SENT;
        } catch (Exception e) {
            getLocalReActorSystem().logError("Error sending message {}", new Serializable[]{message.toString(), e});
            return DeliveryStatus.NOT_SENT;
        }
    }

    private static Consumer<Long, Message> createConsumer(KafkaDriverConfig kafkaDriverConfig) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaDriverConfig.getBootstrapEndpoint());
        properties.put("group.id", kafkaDriverConfig.getGroupId());
        properties.put("key.deserializer", LongDeserializer.class.getName());
        properties.put("value.deserializer", MessageDecoder.class.getName());
        properties.put("max.poll.records", Integer.valueOf(kafkaDriverConfig.getMaxPollRecords()));
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "latest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(List.of(kafkaDriverConfig.getTopic()));
        return kafkaConsumer;
    }

    public static Producer<Long, Message> createProducer(KafkaDriverConfig kafkaDriverConfig) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", kafkaDriverConfig.getBootstrapEndpoint());
        properties.put("key.serializer", LongSerializer.class.getName());
        properties.put("value.serializer", MessageEncoder.class.getName());
        return new KafkaProducer(properties);
    }

    private static void kafkaDriverLoop(Consumer<Long, Message> consumer, KafkaDriver kafkaDriver, ReActorSystem reActorSystem) {
        while (!Thread.currentThread().isInterrupted()) {
            Try.of(() -> {
                return consumer.poll(POLL_TIMEOUT);
            }).recover(InterruptException.class, ConsumerRecords::empty).ifSuccessOrElse(consumerRecords -> {
                consumerRecords.forEach(consumerRecord -> {
                    kafkaDriver.offerMessage((Message) consumerRecord.value());
                });
            }, th -> {
                reActorSystem.logError("Unable to fetch messages from kafka", new Serializable[]{th});
            }).ifError(th2 -> {
                LOGGER.error("CRITIC! Error offering message", th2);
            });
        }
        Thread.currentThread().interrupt();
    }
}
