package org.kinotic.continuum.internal.utils;

import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.kinotic.continuum.api.config.ContinuumProperties;
import reactor.core.scheduler.Scheduler;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;

/* loaded from: input_file:org/kinotic/continuum/internal/utils/KafkaUtil.class */
public class KafkaUtil {
    public static KafkaSender<String, byte[]> createKafkaSender(ContinuumProperties continuumProperties) {
        return createKafkaSender(continuumProperties, null);
    }

    public static KafkaSender<String, byte[]> createKafkaSender(ContinuumProperties continuumProperties, Scheduler scheduler) {
        SenderOptions create = SenderOptions.create(kafkaProducerConfigs(continuumProperties));
        if (scheduler != null) {
            create.scheduler(scheduler);
        }
        return KafkaSender.create(create);
    }

    public static Map<String, Object> kafkaProducerConfigs(ContinuumProperties continuumProperties) {
        Validate.notEmpty(continuumProperties.getKafkaBootstrapServers(), "Kafka bootstrap servers must be provided", new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", continuumProperties.getKafkaBootstrapServers());
        hashMap.put("key.serializer", StringSerializer.class);
        hashMap.put("value.serializer", ByteArraySerializer.class);
        hashMap.put("acks", "all");
        hashMap.put("enable.idempotence", "true");
        return hashMap;
    }
}
