package io.simplesource.saga.saga.internal;

import io.simplesource.data.NonEmptyList;
import io.simplesource.data.Result;
import io.simplesource.kafka.internal.util.Tuple2;
import io.simplesource.saga.model.messages.SagaRequest;
import io.simplesource.saga.model.messages.SagaResponse;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaError;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.serdes.SagaSerdes;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.Joined;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Predicate;

/* loaded from: input_file:io/simplesource/saga/saga/internal/SetupStream.class */
final class SetupStream {
    SetupStream() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> Tuple2<KStream<SagaId, SagaRequest<A>>, KStream<SagaId, SagaResponse>> validateSagaRequests(SagaContext<A> sagaContext, KStream<SagaId, SagaRequest<A>> kStream) {
        Set<String> keySet = sagaContext.actionTopicNamers.keySet();
        KStream[] branch = kStream.mapValues((sagaId, sagaRequest) -> {
            return Tuple2.of(sagaRequest, (List) sagaRequest.initialState.actions.values().stream().map(sagaAction -> {
                String lowerCase = sagaAction.command.actionType.toLowerCase();
                if (keySet.contains(lowerCase)) {
                    return null;
                }
                return SagaError.of(SagaError.Reason.InvalidSaga, String.format("Unknown action type '%s'", lowerCase));
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).collect(Collectors.toList()));
        }).branch(new Predicate[]{(sagaId2, tuple2) -> {
            return ((List) tuple2.v2()).isEmpty();
        }, (sagaId3, tuple22) -> {
            return !((List) tuple22.v2()).isEmpty();
        }});
        return Tuple2.of(branch[0].mapValues((v0) -> {
            return v0.v1();
        }), branch[1].mapValues((v0) -> {
            return v0.v2();
        }).mapValues((sagaId4, list) -> {
            return SagaResponse.of(sagaId4, Result.failure((NonEmptyList) NonEmptyList.fromList(list).get()));
        }));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, SagaStateTransition<A>> addInitialState(SagaContext<A> sagaContext, KStream<SagaId, SagaRequest<A>> kStream, KTable<SagaId, Saga<A>> kTable) {
        SagaSerdes<A> sagaSerdes = sagaContext.sSerdes;
        return kStream.leftJoin(kTable, (sagaRequest, saga) -> {
            return Tuple2.of(sagaRequest, Boolean.valueOf(saga == null));
        }, Joined.with(sagaSerdes.sagaId(), sagaSerdes.request(), sagaSerdes.state())).filter((sagaId, tuple2) -> {
            return ((Boolean) tuple2.v2()).booleanValue();
        }).mapValues((sagaId2, tuple22) -> {
            return SagaStateTransition.SetInitialState.of(((SagaRequest) tuple22.v1()).initialState);
        });
    }
}
