package io.muenchendigital.digiwf.task.service.infra.ingress;

import io.micrometer.core.instrument.MeterRegistry;
import java.util.Collections;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.consumer.ConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.DefaultConsumerFactory;
import org.axonframework.extensions.kafka.eventhandling.consumer.Fetcher;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.KafkaEventMessage;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.SortedKafkaMessageBuffer;
import org.axonframework.extensions.kafka.eventhandling.consumer.streamable.StreamableKafkaMessageSource;
import org.axonframework.serialization.Serializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@EnableConfigurationProperties({AxonKafkaExtendedProperties.class})
@Configuration
@ConditionalOnProperty(value = {"polyflow.axon.kafka.enabled"}, havingValue = "true", matchIfMissing = true)
/* loaded from: input_file:BOOT-INF/classes/io/muenchendigital/digiwf/task/service/infra/ingress/AxonKafkaIngressConfiguration.class */
public class AxonKafkaIngressConfiguration {

    @Value("${HOSTNAME:localhost}")
    private String hostname;

    @Bean
    @Qualifier("polyflowTask")
    public ConsumerFactory<String, byte[]> kafkaConsumerFactoryPolyflowTask(KafkaProperties kafkaProperties) {
        kafkaProperties.setClientId("polyflow-task-" + this.hostname);
        return new DefaultConsumerFactory(kafkaProperties.buildConsumerProperties());
    }

    @Bean
    @Qualifier("polyflowData")
    public ConsumerFactory<String, byte[]> kafkaConsumerFactoryPolyflowData(KafkaProperties kafkaProperties) {
        kafkaProperties.setClientId("polyflow-data-" + this.hostname);
        return new DefaultConsumerFactory(kafkaProperties.buildConsumerProperties());
    }

    @ConditionalOnProperty(value = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "TRACKING")
    @Bean({"kafkaMessageSourcePolyflowData"})
    public StreamableKafkaMessageSource<String, byte[]> kafkaMessageSourcePolyflowData(KafkaProperties kafkaProperties, AxonKafkaExtendedProperties axonKafkaExtendedProperties, @Qualifier("polyflowData") ConsumerFactory<String, byte[]> consumerFactory, Fetcher<String, byte[], KafkaEventMessage> fetcher, @Qualifier("eventSerializer") Serializer serializer, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, MeterRegistry meterRegistry) {
        return StreamableKafkaMessageSource.builder().topics(Collections.singletonList(axonKafkaExtendedProperties.getTopicDataEntries())).consumerFactory(new MetricsBindingConsumerFactory(meterRegistry, consumerFactory)).consumerFactory(consumerFactory).serializer(serializer).fetcher(fetcher).messageConverter(kafkaMessageConverter).bufferFactory(() -> {
            return new SortedKafkaMessageBuffer(kafkaProperties.getFetcher().getBufferSize());
        }).build();
    }

    @ConditionalOnProperty(value = {"axon.kafka.consumer.event-processor-mode"}, havingValue = "TRACKING")
    @Bean({"kafkaMessageSourcePolyflowTask"})
    public StreamableKafkaMessageSource<String, byte[]> kafkaMessageSourcePolyflowTask(KafkaProperties kafkaProperties, AxonKafkaExtendedProperties axonKafkaExtendedProperties, @Qualifier("polyflowTask") ConsumerFactory<String, byte[]> consumerFactory, Fetcher<String, byte[], KafkaEventMessage> fetcher, @Qualifier("eventSerializer") Serializer serializer, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, MeterRegistry meterRegistry) {
        return StreamableKafkaMessageSource.builder().topics(Collections.singletonList(axonKafkaExtendedProperties.getTopicTasks())).consumerFactory(new MetricsBindingConsumerFactory(meterRegistry, consumerFactory)).consumerFactory(consumerFactory).serializer(serializer).fetcher(fetcher).messageConverter(kafkaMessageConverter).bufferFactory(() -> {
            return new SortedKafkaMessageBuffer(kafkaProperties.getFetcher().getBufferSize());
        }).build();
    }
}
