package io.axual.streams.proxy.wrapped;

import io.axual.common.annotation.InterfaceStability;
import io.axual.streams.proxy.generic.factory.OptimizedTopologyFactory;
import io.axual.streams.proxy.generic.proxy.StreamsProxy;
import java.lang.Thread;
import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.state.StreamsMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:io/axual/streams/proxy/wrapped/WrappedStreams.class */
public class WrappedStreams implements StreamsProxy {
    private static final Logger LOG = LoggerFactory.getLogger(WrappedStreams.class);
    private final WrappedStreamsConfig config;
    private final KafkaStreams kafkaStreams;

    public WrappedStreams(Map<String, Object> map) {
        this.config = new WrappedStreamsConfig(map);
        Properties properties = new Properties();
        properties.putAll(this.config.getDownstreamConfigs());
        Topology create = OptimizedTopologyFactory.Wrapper.ensureOptimizedTopologyFactory(this.config.getTopologyFactory()).create(new WrappedStreamsBuilder(), properties);
        LOG.info("{}", create.describe());
        this.kafkaStreams = new KafkaStreams(create, properties, this.config.getClientSupplier());
        if (this.config.getUncaughtExceptionHandlerFactory() != null) {
            setUncaughtExceptionHandler(this.config.getUncaughtExceptionHandlerFactory().create(this));
        }
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void setStateListener(KafkaStreams.StateListener stateListener) {
        this.kafkaStreams.setStateListener(stateListener);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public KafkaStreams.State state() {
        return this.kafkaStreams.state();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Map<MetricName, ? extends Metric> metrics() {
        return this.kafkaStreams.metrics();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void start() {
        this.kafkaStreams.start();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void stop() {
        close();
    }

    public Map<String, Object> getConfigs() {
        return this.config.getConfigs();
    }

    public Object getConfig(String str) {
        return this.config.getConfigs().get(str);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams, java.lang.AutoCloseable
    public void close() {
        this.kafkaStreams.close();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void close(Duration duration) {
        this.kafkaStreams.close(duration);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void cleanUp() {
        this.kafkaStreams.cleanUp();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
        this.kafkaStreams.setUncaughtExceptionHandler(streamsUncaughtExceptionHandler);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public void setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.kafkaStreams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Collection<StreamsMetadata> allMetadata() {
        return this.kafkaStreams.allMetadata();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Collection<StreamsMetadata> allMetadataForStore(String str) {
        return this.kafkaStreams.allMetadataForStore(str);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    @Deprecated
    public Set<ThreadMetadata> localThreadsMetadata() {
        return this.kafkaStreams.localThreadsMetadata();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, Serializer<K> serializer) {
        return this.kafkaStreams.queryMetadataForKey(str, k, serializer);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <K> KeyQueryMetadata queryMetadataForKey(String str, K k, StreamPartitioner<? super K, ?> streamPartitioner) {
        return this.kafkaStreams.queryMetadataForKey(str, k, streamPartitioner);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public <T> T store(StoreQueryParameters<T> storeQueryParameters) {
        return (T) this.kafkaStreams.store(storeQueryParameters);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> addStreamThread() {
        return this.kafkaStreams.addStreamThread();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> removeStreamThread() {
        return this.kafkaStreams.removeStreamThread();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Optional<String> removeStreamThread(Duration duration) {
        return this.kafkaStreams.removeStreamThread(duration);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Collection<org.apache.kafka.streams.StreamsMetadata> metadataForAllStreamsClients() {
        return this.kafkaStreams.metadataForAllStreamsClients();
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Collection<org.apache.kafka.streams.StreamsMetadata> streamsMetadataForStore(String str) {
        return this.kafkaStreams.streamsMetadataForStore(str);
    }

    @Override // io.axual.streams.proxy.generic.streams.Streams
    public Set<org.apache.kafka.streams.ThreadMetadata> metadataForLocalThreads() {
        return this.kafkaStreams.metadataForLocalThreads();
    }
}
