package io.simplesource.saga.saga.app;

import io.simplesource.saga.model.config.StreamAppConfig;
import io.simplesource.saga.model.saga.RetryStrategy;
import io.simplesource.saga.model.specs.ActionSpec;
import io.simplesource.saga.model.specs.SagaSpec;
import io.simplesource.saga.saga.internal.SagaContext;
import io.simplesource.saga.saga.internal.SagaTopologyBuilder;
import io.simplesource.saga.shared.app.StreamAppUtils;
import io.simplesource.saga.shared.kafka.KafkaPublisher;
import io.simplesource.saga.shared.properties.PropertiesBuilder;
import io.simplesource.saga.shared.topics.TopicConfig;
import io.simplesource.saga.shared.topics.TopicConfigBuilder;
import io.simplesource.saga.shared.topics.TopicCreation;
import io.simplesource.saga.shared.topics.TopicTypes;
import io.simplesource.saga.shared.topics.TopicUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/simplesource/saga/saga/app/SagaApp.class */
public final class SagaApp<A> {
    private static Logger logger = LoggerFactory.getLogger(SagaApp.class);
    private final SagaSpec<A> sagaSpec;
    private final ActionSpec<A> actionSpec;
    private final Map<String, RetryStrategy> retryStrategyOverride = new HashMap();
    private RetryStrategy defaultRetryStrategy = RetryStrategy.failFast();
    private final Map<String, TopicConfigBuilder.BuildSteps> buildFuncMap = new HashMap();
    private final TopicConfigBuilder.BuildSteps sagaTopicBuildSteps;
    private final PropertiesBuilder.BuildSteps propertiesBuildSteps;
    private ScheduledExecutorService executor;

    private SagaApp(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec, TopicConfigBuilder.BuildSteps buildSteps, PropertiesBuilder.BuildSteps buildSteps2) {
        this.sagaSpec = sagaSpec;
        this.actionSpec = actionSpec;
        this.sagaTopicBuildSteps = buildSteps;
        this.propertiesBuildSteps = buildSteps2;
    }

    public static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec, TopicConfigBuilder.BuildSteps buildSteps, PropertiesBuilder.BuildSteps buildSteps2) {
        return new SagaApp<>(sagaSpec, actionSpec, buildSteps, buildSteps2);
    }

    public static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec, TopicConfigBuilder.BuildSteps buildSteps) {
        return new SagaApp<>(sagaSpec, actionSpec, buildSteps, propertiesBuilder -> {
            return propertiesBuilder;
        });
    }

    public static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec) {
        return of(sagaSpec, actionSpec, topicConfigBuilder -> {
            return topicConfigBuilder;
        }, propertiesBuilder -> {
            return propertiesBuilder;
        });
    }

    public SagaApp<A> withAction(String str, TopicConfigBuilder.BuildSteps buildSteps) {
        registerAction(str.toLowerCase(), buildSteps);
        return this;
    }

    public SagaApp<A> withActions(Collection<String> collection, TopicConfigBuilder.BuildSteps buildSteps) {
        collection.forEach(str -> {
            registerAction(str.toLowerCase(), buildSteps);
        });
        return this;
    }

    public SagaApp<A> withActions(String... strArr) {
        return withActions(Arrays.asList(strArr), topicConfigBuilder -> {
            return topicConfigBuilder;
        });
    }

    public SagaApp<A> withActions(TopicConfigBuilder.BuildSteps buildSteps, String... strArr) {
        return withActions(Arrays.asList(strArr), buildSteps);
    }

    public SagaApp<A> withExecutor(ScheduledExecutorService scheduledExecutorService) {
        this.executor = scheduledExecutorService;
        return this;
    }

    public SagaApp<A> withRetryStrategy(RetryStrategy retryStrategy) {
        this.defaultRetryStrategy = retryStrategy;
        return this;
    }

    public SagaApp<A> withRetryStrategy(String str, RetryStrategy retryStrategy) {
        this.retryStrategyOverride.put(str.toLowerCase(), retryStrategy);
        return this;
    }

    public void run(StreamAppConfig streamAppConfig) {
        PropertiesBuilder.BuildSteps withNextStep = this.propertiesBuildSteps.withNextStep(propertiesBuilder -> {
            return propertiesBuilder.withStreamAppConfig(streamAppConfig);
        });
        Properties build = withNextStep.build();
        Properties build2 = withNextStep.withInitialStep((v0) -> {
            return v0.withDefaultStreamProps();
        }).build();
        Topology buildTopology = buildTopology(list -> {
            StreamAppUtils.createMissingTopics(build, list);
        }, withNextStep.withInitialStep((v0) -> {
            return v0.withDefaultProducerProps();
        }).build());
        logger.info("Topology description {}", buildTopology.describe());
        StreamAppUtils.runStreamApp(build2, buildTopology);
    }

    private Topology buildTopology(Consumer<List<TopicCreation>> consumer, Properties properties) {
        KafkaPublisher kafkaPublisher = new KafkaPublisher(new KafkaProducer(properties, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer()), this.sagaSpec.serdes.sagaId(), this.sagaSpec.serdes.transition());
        Objects.requireNonNull(kafkaPublisher);
        return buildTopology(consumer, (v1, v2, v3) -> {
            r2.send(v1, v2, v3);
        });
    }

    Topology buildTopology(Consumer<List<TopicCreation>> consumer, RetryPublisher<A> retryPublisher) {
        ArrayList arrayList = new ArrayList();
        TopicConfig build = this.sagaTopicBuildSteps.withInitialStep(topicConfigBuilder -> {
            return topicConfigBuilder.withTopicBaseName("saga_coordinator");
        }).build(TopicTypes.SagaTopic.all, Collections.singletonMap("retention.ms", "-1"), Collections.singletonMap("saga_state", Collections.singletonMap("cleanup.policy", "compact")));
        arrayList.addAll(build.allTopics());
        Map<String, RetryStrategy> retryStrategyMap = getRetryStrategyMap();
        HashMap hashMap = new HashMap();
        this.buildFuncMap.forEach((str, buildSteps) -> {
            TopicConfig build2 = buildSteps.withInitialStep(topicConfigBuilder2 -> {
                return topicConfigBuilder2.withTopicBaseName(TopicUtils.actionTopicBaseName(str));
            }).build(TopicTypes.ActionTopic.all);
            arrayList.addAll(TopicCreation.allTopics(build2));
            hashMap.put(str, build2.namer);
        });
        consumer.accept(arrayList);
        SagaContext sagaContext = new SagaContext(this.sagaSpec, this.actionSpec, build.namer, hashMap, retryStrategyMap, retryPublisher, this.executor != null ? this.executor : Executors.newScheduledThreadPool(1));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        SagaTopologyBuilder.addSubTopology(sagaContext, streamsBuilder);
        return streamsBuilder.build();
    }

    private void registerAction(String str, TopicConfigBuilder.BuildSteps buildSteps) {
        this.buildFuncMap.put(str, buildSteps);
    }

    private Map<String, RetryStrategy> getRetryStrategyMap() {
        HashMap hashMap = new HashMap();
        this.buildFuncMap.keySet().forEach(str -> {
            hashMap.put(str, this.retryStrategyOverride.getOrDefault(str, this.defaultRetryStrategy));
        });
        return hashMap;
    }
}
