/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl;

import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.ConfigurationCustomizer;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.store.GlobalStoreProcessorSupplier;
import io.quarkiverse.kafkastreamsprocessor.api.decorator.producer.ProducerOnSendInterceptor;
import io.quarkiverse.kafkastreamsprocessor.impl.KStreamProcessorSupplier;
import io.quarkiverse.kafkastreamsprocessor.impl.KafkaClientSupplierDecorator;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.DefaultTopologySerdesConfiguration;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TypeUtils;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.store.DefaultGlobalStateStoreProcessor;
import io.quarkiverse.kafkastreamsprocessor.spi.SinkToTopicMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.spi.SourceToTopicsMappingBuilder;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.GlobalStateStoreConfig;
import io.quarkiverse.kafkastreamsprocessor.spi.properties.KStreamsProcessorConfig;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.Produces;
import jakarta.enterprise.inject.spi.BeanManager;
import jakarta.inject.Inject;
import java.util.Map;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;

@ApplicationScoped
public class TopologyProducer {
    public static final String PROCESSOR_NAME = "Processor";
    public static final String DLQ_SINK_NAME = "DLQ";
    private final KStreamsProcessorConfig kStreamsProcessorConfig;
    private final Instance<ConfigurationCustomizer> configCustomizers;
    private final SourceToTopicsMappingBuilder sourceToTopicsMappingBuilder;
    private final SinkToTopicMappingBuilder sinkToTopicMappingBuilder;
    private final Instance<ProducerOnSendInterceptor> interceptors;

    @Inject
    public TopologyProducer(KStreamsProcessorConfig kStreamsProcessorConfig, Instance<ConfigurationCustomizer> configCustomizer, SourceToTopicsMappingBuilder sourceToTopicsMappingBuilder, SinkToTopicMappingBuilder sinkToTopicMappingBuilder, Instance<ProducerOnSendInterceptor> interceptors) {
        this.kStreamsProcessorConfig = kStreamsProcessorConfig;
        this.configCustomizers = configCustomizer;
        this.sourceToTopicsMappingBuilder = sourceToTopicsMappingBuilder;
        this.sinkToTopicMappingBuilder = sinkToTopicMappingBuilder;
        this.interceptors = interceptors;
    }

    @Produces
    public KafkaClientSupplier kafkaClientSupplier() {
        return new KafkaClientSupplierDecorator(this.interceptors);
    }

    @Produces
    public TopologyConfigurationImpl configuration(BeanManager beanManager, DefaultTopologySerdesConfiguration defaultConfiguration) {
        TopologyConfigurationImpl configuration = TopologyProducer.initializeConfiguration(beanManager);
        defaultConfiguration.apply(configuration);
        this.configCustomizers.forEach(customizer -> customizer.fillConfiguration((Configuration)configuration));
        return configuration;
    }

    private static TopologyConfigurationImpl initializeConfiguration(BeanManager beanManager) {
        Class<?> processorType = TypeUtils.reifiedProcessorType(beanManager);
        Class<?> keyType = TypeUtils.extractKeyType(processorType);
        if (keyType == null || Object.class.equals(keyType)) {
            throw new IllegalArgumentException("Could not determine key type of Processor class " + processorType.getName());
        }
        Class<?> payloadType = TypeUtils.extractPayloadType(processorType);
        if (payloadType == null || Object.class.equals(payloadType)) {
            throw new IllegalArgumentException("Could not determine payload type of Processor class " + processorType.getName());
        }
        return new TopologyConfigurationImpl(keyType, payloadType);
    }

    @Produces
    public Topology topology(TopologyConfigurationImpl configuration, KStreamProcessorSupplier<?, ?, ?, ?> kStreamProcessorSupplier) {
        Map sourceToTopicMapping = this.sourceToTopicsMappingBuilder.sourceToTopicsMapping();
        Map sinkToTopicMapping = this.sinkToTopicMappingBuilder.sinkToTopicMapping();
        Topology topology = new Topology();
        sourceToTopicMapping.forEach((source, topics) -> topology.addSource(source, configuration.getSourceKeySerde().deserializer(), configuration.getSourceValueSerde().deserializer(), topics));
        topology.addProcessor(PROCESSOR_NAME, kStreamProcessorSupplier, sourceToTopicMapping.keySet().toArray(new String[0]));
        sinkToTopicMapping.forEach((sink, topic) -> topology.addSink(sink, topic, configuration.getSinkKeySerializer(), configuration.getSinkValueSerializer(), new String[]{PROCESSOR_NAME}));
        if (this.kStreamsProcessorConfig.dlq().topic().isPresent()) {
            topology.addSink(DLQ_SINK_NAME, (String)this.kStreamsProcessorConfig.dlq().topic().get(), configuration.getSourceKeySerde().serializer(), configuration.getSourceValueSerde().serializer(), new String[]{PROCESSOR_NAME});
        }
        configuration.getStoreConfigurations().forEach(storeConfiguration -> topology.addStateStore(storeConfiguration.getStoreBuilder(), new String[]{PROCESSOR_NAME}));
        this.addGlobalStores(configuration, topology);
        return topology;
    }

    private void addGlobalStores(TopologyConfigurationImpl configuration, Topology topology) {
        configuration.getGlobalStoreConfigurations().forEach(config -> {
            String storeName = config.getStoreBuilder().name();
            String topicName = ((GlobalStateStoreConfig)this.kStreamsProcessorConfig.globalStores().get(storeName)).topic();
            GlobalStoreProcessorSupplier processorSupplier = config.getGlobalStoreProcessorSupplier();
            if (processorSupplier == null) {
                processorSupplier = () -> new DefaultGlobalStateStoreProcessor(storeName);
            }
            topology.addGlobalStore(config.getStoreBuilder(), topicName, config.getKeyDeserializer(), config.getValueDeserializer(), topicName, storeName, (ProcessorSupplier)processorSupplier);
        });
    }
}

