package org.kinotic.continuum.internal.config;

import java.util.HashMap;
import org.apache.commons.lang3.Validate;
import org.kinotic.continuum.api.config.ContinuumProperties;
import org.kinotic.continuum.internal.utils.KafkaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;

@Configuration
/* loaded from: input_file:org/kinotic/continuum/internal/config/ContinuumKafkaConfig.class */
public class ContinuumKafkaConfig {

    @Autowired
    private ContinuumProperties continuumProperties;

    @Bean
    public KafkaAdmin admin() {
        Validate.notEmpty(this.continuumProperties.getKafkaBootstrapServers(), "Kafka bootstrap servers must be provided", new Object[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", this.continuumProperties.getKafkaBootstrapServers());
        return new KafkaAdmin(hashMap);
    }

    private ProducerFactory<String, byte[]> kafkaProducerFactory() {
        return new DefaultKafkaProducerFactory(KafkaUtil.kafkaProducerConfigs(this.continuumProperties));
    }

    @Bean(destroyMethod = "close")
    public KafkaSender<String, byte[]> kafkaSender() {
        return KafkaUtil.createKafkaSender(this.continuumProperties, Schedulers.newParallel("kafkaSenderParallel"));
    }

    @Bean
    public KafkaTemplate<String, byte[]> kafkaTemplate() {
        return new KafkaTemplate<>(kafkaProducerFactory());
    }
}
