/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.kafkastreamsprocessor.impl.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.daniel.shuy.kafka.protobuf.serde.KafkaProtobufSerde;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.quarkiverse.kafkastreamsprocessor.api.configuration.Configuration;
import io.quarkiverse.kafkastreamsprocessor.api.serdes.JacksonSerde;
import io.quarkiverse.kafkastreamsprocessor.impl.IntrospectionSerializer;
import io.quarkiverse.kafkastreamsprocessor.impl.StringIntrospectionSerializer;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl;
import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TypeUtils;
import jakarta.enterprise.context.Dependent;
import jakarta.inject.Inject;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;

@Dependent
public class DefaultTopologySerdesConfiguration {
    private final ObjectMapper objectMapper;

    @Inject
    public DefaultTopologySerdesConfiguration(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public void apply(Configuration topologyConfiguration) {
        TopologyConfigurationImpl configuration = (TopologyConfigurationImpl)topologyConfiguration;
        this.configureSinkValueSerializer(configuration);
        this.configureSourceValueSerde(configuration);
    }

    private void configureSourceValueSerde(TopologyConfigurationImpl configuration) {
        Parser<MessageLite> parser;
        if (configuration.getSourceKeySerde() == null) {
            if (MessageLite.class.isAssignableFrom(configuration.getProcessorKeyType())) {
                parser = TypeUtils.createParserFromType(configuration.getProcessorKeyType());
                configuration.setSourceKeySerde((Serde<?>)new KafkaProtobufSerde(parser));
            } else {
                configuration.setSourceKeySerde(Serdes.String());
            }
        }
        if (configuration.getSourceValueSerde() == null) {
            if (MessageLite.class.isAssignableFrom(configuration.getProcessorPayloadType())) {
                parser = TypeUtils.createParserFromType(configuration.getProcessorPayloadType());
                configuration.setSourceValueSerde((Serde<?>)new KafkaProtobufSerde(parser));
            } else {
                configuration.setSourceValueSerde((Serde<?>)new JacksonSerde(configuration.getProcessorPayloadType(), this.objectMapper));
            }
        }
    }

    private void configureSinkValueSerializer(TopologyConfigurationImpl configuration) {
        if (configuration.getSinkKeySerializer() == null) {
            configuration.setSinkKeySerializer(new StringIntrospectionSerializer());
        }
        if (configuration.getSinkValueSerializer() == null) {
            configuration.setSinkValueSerializer(new IntrospectionSerializer(this.objectMapper));
        }
    }
}

