package io.simplesource.saga.saga.internal;

import io.simplesource.kafka.internal.util.Tuple2;
import java.time.Duration;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

/* loaded from: input_file:io/simplesource/saga/saga/internal/ResultDistributor.class */
final class ResultDistributor {
    ResultDistributor() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> KStream<K, String> resultTopicMapStream(DistributorContext<K, V> distributorContext, StreamsBuilder streamsBuilder) {
        return streamsBuilder.stream(distributorContext.topicNameMapTopic, Consumed.with(distributorContext.serdes.key, Serdes.String()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, V> void distribute(DistributorContext<K, V> distributorContext, KStream<K, V> kStream, KStream<K, String> kStream2) {
        DistributorSerdes<K, V> distributorSerdes = distributorContext.serdes;
        Duration ofSeconds = Duration.ofSeconds(distributorContext.responseWindowSpec.retentionInSeconds());
        kStream.selectKey((obj, obj2) -> {
            return distributorContext.idMapper.apply(obj2);
        }).join(kStream2, (v0, v1) -> {
            return Tuple2.of(v0, v1);
        }, JoinWindows.of(ofSeconds).until((ofSeconds.toMillis() * 2) + 1), Joined.with(distributorSerdes.key, distributorSerdes.value, Serdes.String())).map((obj3, tuple2) -> {
            return KeyValue.pair(String.format("%s:%s", tuple2.v2(), distributorContext.keyToUuid.apply(obj3).toString()), tuple2.v1());
        }).to((str, obj4, recordContext) -> {
            return str.substring(0, str.length() - 37);
        }, Produced.with(Serdes.String(), distributorSerdes.value));
    }
}
