package io.simplesource.saga.saga.internal;

import io.simplesource.saga.model.action.ActionStatus;
import io.simplesource.saga.model.action.SagaAction;
import io.simplesource.saga.model.messages.SagaStateTransition;
import io.simplesource.saga.model.saga.Saga;
import io.simplesource.saga.model.saga.SagaId;
import io.simplesource.saga.model.serdes.SagaSerdes;
import io.simplesource.saga.saga.internal.ActionTransition;
import io.simplesource.saga.saga.internal.SagaTopologyBuilder;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/simplesource/saga/saga/internal/TransitionStream.class */
public final class TransitionStream {
    TransitionStream() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <A> KStream<SagaId, Saga<A>> applyStateTransitions(SagaContext<A> sagaContext, SagaTopologyBuilder.DelayedRetryPublisher<A> delayedRetryPublisher, KStream<SagaId, SagaStateTransition<A>> kStream) {
        SagaSerdes<A> sagaSerdes = sagaContext.sSerdes;
        return kStream.groupByKey(Grouped.with(sagaSerdes.sagaId(), sagaSerdes.transition())).aggregate(() -> {
            return Saga.of(new HashMap());
        }, (sagaId, sagaStateTransition, saga) -> {
            ActionTransition.SagaWithRetry applyTransition = SagaTransition.applyTransition(sagaStateTransition, saga);
            sendRetries(delayedRetryPublisher, saga, applyTransition.retryActions);
            return applyTransition.saga;
        }, Materialized.with(sagaSerdes.sagaId(), sagaSerdes.state())).toStream();
    }

    private static <A> void sendRetries(SagaTopologyBuilder.DelayedRetryPublisher<A> delayedRetryPublisher, Saga<A> saga, List<ActionTransition.SagaWithRetry.Retry> list) {
        list.forEach(retry -> {
            delayedRetryPublisher.send(retry.actionType, retry.retryCount, saga.sagaId, SagaStateTransition.SagaActionStateChanged.of(saga.sagaId, retry.actionId, ActionStatus.RetryCompleted, ((SagaAction) saga.actions.get(retry.actionId)).error, Optional.empty(), retry.isUndo));
        });
    }
}
