package io.muenchendigital.digiwf.task.polyflow.kafka;

import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.config.EventProcessingConfigurer;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.eventhandling.PropagatingErrorHandler;
import org.axonframework.extensions.kafka.KafkaProperties;
import org.axonframework.extensions.kafka.autoconfig.KafkaAutoConfiguration;
import org.axonframework.extensions.kafka.eventhandling.DefaultKafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaEventPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.KafkaPublisher;
import org.axonframework.extensions.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.serialization.Serializer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Profile;

@AutoConfigureBefore({KafkaAutoConfiguration.class})
@EnableConfigurationProperties({PolyflowAxonKafkaProperties.class})
@Configuration
@Profile({"!disable-polyflow-kafka"})
/* loaded from: input_file:io/muenchendigital/digiwf/task/polyflow/kafka/PolyflowAxonKafkaConfiguration.class */
public class PolyflowAxonKafkaConfiguration {
    @ConditionalOnMissingBean
    @Bean
    public KafkaTopicRouter kafkaTopicRouter(PolyflowAxonKafkaProperties polyflowAxonKafkaProperties) {
        return cls -> {
            return (String) polyflowAxonKafkaProperties.getTopics().stream().filter(payloadTypeToTopic -> {
                return payloadTypeToTopic.getPayloadType().isAssignableFrom(cls);
            }).findFirst().map((v0) -> {
                return v0.getTopic();
            }).orElse(null);
        };
    }

    @Bean
    @Primary
    public KafkaMessageConverter<String, byte[]> routingKafkaMessageConverter(@Qualifier("eventSerializer") Serializer serializer, final KafkaTopicRouter kafkaTopicRouter) {
        final DefaultKafkaMessageConverter build = DefaultKafkaMessageConverter.builder().serializer(serializer).build();
        return new KafkaMessageConverter<String, byte[]>() { // from class: io.muenchendigital.digiwf.task.polyflow.kafka.PolyflowAxonKafkaConfiguration.1
            public ProducerRecord<String, byte[]> createKafkaMessage(EventMessage<?> eventMessage, String str) {
                String str2 = kafkaTopicRouter.topicForPayloadType(eventMessage.getPayloadType());
                return build.createKafkaMessage(eventMessage, str2 == null ? str : str2);
            }

            public Optional<EventMessage<?>> readKafkaMessage(ConsumerRecord<String, byte[]> consumerRecord) {
                return build.readKafkaMessage(consumerRecord);
            }
        };
    }

    @Bean
    public KafkaEventPublisher<String, byte[]> routingKafkaEventPublisher(KafkaPublisher<String, byte[]> kafkaPublisher, KafkaProperties kafkaProperties, EventProcessingConfigurer eventProcessingConfigurer, KafkaTopicRouter kafkaTopicRouter) {
        RoutingKafkaEventPublisher m7build = RoutingKafkaEventPublisher.builder().m8kafkaPublisher((KafkaPublisher) kafkaPublisher).kafkaTopicRouter(kafkaTopicRouter).m7build();
        eventProcessingConfigurer.registerEventHandler(configuration -> {
            return m7build;
        }).registerListenerInvocationErrorHandler("__axon-kafka-event-publishing-group", configuration2 -> {
            return PropagatingErrorHandler.instance();
        }).assignHandlerTypesMatching("__axon-kafka-event-publishing-group", cls -> {
            return cls.isAssignableFrom(KafkaEventPublisher.class);
        });
        KafkaProperties.EventProcessorMode eventProcessorMode = kafkaProperties.getProducer().getEventProcessorMode();
        if (eventProcessorMode == KafkaProperties.EventProcessorMode.SUBSCRIBING) {
            eventProcessingConfigurer.registerSubscribingEventProcessor("__axon-kafka-event-publishing-group");
        } else if (eventProcessorMode == KafkaProperties.EventProcessorMode.TRACKING) {
            eventProcessingConfigurer.registerTrackingEventProcessor("__axon-kafka-event-publishing-group");
        } else {
            if (eventProcessorMode != KafkaProperties.EventProcessorMode.POOLED_STREAMING) {
                throw new AxonConfigurationException("Unknown Event Processor Mode [" + eventProcessorMode + "] detected");
            }
            eventProcessingConfigurer.registerPooledStreamingEventProcessor("__axon-kafka-event-publishing-group");
        }
        return m7build;
    }

    @Bean(destroyMethod = "shutDown")
    public KafkaPublisher<String, byte[]> kafkaAcknowledgingPublisher(ProducerFactory<String, byte[]> producerFactory, KafkaMessageConverter<String, byte[]> kafkaMessageConverter, org.axonframework.config.Configuration configuration, KafkaProperties kafkaProperties, Serializer serializer) {
        return KafkaPublisher.builder().producerFactory(producerFactory).messageConverter(kafkaMessageConverter).messageMonitor(configuration.messageMonitor(KafkaPublisher.class, "kafkaPublisher")).topicResolver(eventMessage -> {
            return Optional.of(kafkaProperties.getDefaultTopic());
        }).serializer(serializer).publisherAckTimeout(Long.parseLong((String) kafkaProperties.getProducer().getProperties().getOrDefault("delivery.timeout.ms", "30000")) + 1000).build();
    }
}
