package io.micronaut.configuration.kafka.streams;

import io.micronaut.configuration.kafka.streams.event.AfterKafkaStreamsStart;
import io.micronaut.configuration.kafka.streams.event.BeforeKafkaStreamStart;
import io.micronaut.context.annotation.Context;
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Parameter;
import io.micronaut.context.annotation.Secondary;
import io.micronaut.context.event.ApplicationEventPublisher;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Singleton;
import java.io.Closeable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Factory
/* loaded from: input_file:io/micronaut/configuration/kafka/streams/KafkaStreamsFactory.class */
public class KafkaStreamsFactory implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaStreamsFactory.class);
    private static final String START_KAFKA_STREAMS_PROPERTY = "start-kafka-streams";
    private static final String UNCAUGHT_EXCEPTION_HANDLER_PROPERTY = "uncaught-exception-handler";
    private final Map<KafkaStreams, ConfiguredStreamBuilder> streams = new ConcurrentHashMap();
    private final ApplicationEventPublisher eventPublisher;

    public KafkaStreamsFactory(ApplicationEventPublisher applicationEventPublisher) {
        this.eventPublisher = applicationEventPublisher;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @EachBean(AbstractKafkaStreamsConfiguration.class)
    public ConfiguredStreamBuilder streamsBuilder(AbstractKafkaStreamsConfiguration<?, ?> abstractKafkaStreamsConfiguration) {
        return new ConfiguredStreamBuilder(abstractKafkaStreamsConfiguration.getConfig());
    }

    public Map<KafkaStreams, ConfiguredStreamBuilder> getStreams() {
        return this.streams;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @EachBean(ConfiguredStreamBuilder.class)
    @Context
    public KafkaStreams kafkaStreams(@Parameter String str, ConfiguredStreamBuilder configuredStreamBuilder, KafkaClientSupplier kafkaClientSupplier, KStream<?, ?>... kStreamArr) {
        Topology build = configuredStreamBuilder.build(configuredStreamBuilder.getConfiguration());
        KafkaStreams kafkaStreams = new KafkaStreams(build, configuredStreamBuilder.getConfiguration(), kafkaClientSupplier);
        Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler = makeUncaughtExceptionHandler(configuredStreamBuilder.getConfiguration());
        Objects.requireNonNull(kafkaStreams);
        makeUncaughtExceptionHandler.ifPresent(kafkaStreams::setUncaughtExceptionHandler);
        boolean parseBoolean = Boolean.parseBoolean(configuredStreamBuilder.getConfiguration().getProperty(START_KAFKA_STREAMS_PROPERTY, Boolean.TRUE.toString()));
        if (parseBoolean) {
            this.eventPublisher.publishEvent(new BeforeKafkaStreamStart(kafkaStreams, kStreamArr));
        }
        this.streams.put(kafkaStreams, configuredStreamBuilder);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initializing Application {} with topology:\n{}", str, build.describe().toString());
        }
        if (parseBoolean) {
            kafkaStreams.start();
            this.eventPublisher.publishEvent(new AfterKafkaStreamsStart(kafkaStreams, kStreamArr));
        }
        return kafkaStreams;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Singleton
    public InteractiveQueryService interactiveQueryService() {
        return new InteractiveQueryService(this.streams.keySet());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Singleton
    @Secondary
    public KafkaClientSupplier kafkaClientSupplier() {
        return new DefaultKafkaClientSupplier();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    @PreDestroy
    public void close() {
        Iterator<KafkaStreams> it = this.streams.keySet().iterator();
        while (it.hasNext()) {
            try {
                it.next().close(Duration.ofSeconds(3L));
            } catch (Exception e) {
            }
        }
    }

    Optional<StreamsUncaughtExceptionHandler> makeUncaughtExceptionHandler(Properties properties) {
        return Optional.ofNullable(properties.getProperty(UNCAUGHT_EXCEPTION_HANDLER_PROPERTY)).filter(Predicate.not((v0) -> {
            return v0.isBlank();
        })).map(str -> {
            try {
                StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse valueOf = StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.valueOf(str.toUpperCase());
                return th -> {
                    if (LOG.isWarnEnabled()) {
                        LOG.warn("Responding with {} to unexpected exception thrown by kafka stream thread", valueOf, th);
                    }
                    return valueOf;
                };
            } catch (IllegalArgumentException e) {
                if (!LOG.isWarnEnabled()) {
                    return null;
                }
                LOG.warn("Ignoring illegal exception handler: {}. Please use one of: {}", str, Arrays.asList(StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.values()));
                return null;
            }
        });
    }
}
