package io.zeebe.broker.system.partitions.impl.steps;

import io.zeebe.broker.system.partitions.PartitionContext;
import io.zeebe.broker.system.partitions.PartitionStep;
import io.zeebe.engine.processing.streamprocessor.StreamProcessor;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;

/* loaded from: input_file:io/zeebe/broker/system/partitions/impl/steps/StreamProcessorPartitionStep.class */
public class StreamProcessorPartitionStep implements PartitionStep {
    @Override // io.zeebe.broker.system.partitions.PartitionStep
    public ActorFuture<Void> open(PartitionContext partitionContext) {
        StreamProcessor createStreamProcessor = createStreamProcessor(partitionContext);
        ActorFuture openAsync = createStreamProcessor.openAsync(!partitionContext.shouldProcess());
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        openAsync.onComplete((r7, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
                return;
            }
            partitionContext.setStreamProcessor(createStreamProcessor);
            if (partitionContext.shouldProcess()) {
                createStreamProcessor.resumeProcessing();
            } else {
                createStreamProcessor.pauseProcessing();
            }
            partitionContext.getComponentHealthMonitor().registerComponent(createStreamProcessor.getName(), createStreamProcessor);
            completableActorFuture.complete((Object) null);
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.broker.system.partitions.PartitionStep
    public ActorFuture<Void> close(PartitionContext partitionContext) {
        partitionContext.getComponentHealthMonitor().removeComponent(partitionContext.getStreamProcessor().getName());
        ActorFuture<Void> closeAsync = partitionContext.getStreamProcessor().closeAsync();
        partitionContext.setStreamProcessor(null);
        return closeAsync;
    }

    @Override // io.zeebe.broker.system.partitions.PartitionStep
    public String getName() {
        return "StreamProcessor";
    }

    private StreamProcessor createStreamProcessor(PartitionContext partitionContext) {
        return StreamProcessor.builder().logStream(partitionContext.getLogStream()).actorScheduler(partitionContext.getScheduler()).zeebeDb(partitionContext.getZeebeDb()).nodeId(partitionContext.getNodeId()).commandResponseWriter(partitionContext.getCommandApiService().newCommandResponseWriter()).detectReprocessingInconsistency(partitionContext.getBrokerCfg().getExperimental().isDetectReprocessingInconsistency()).onProcessedListener(partitionContext.getCommandApiService().getOnProcessedListener(partitionContext.getPartitionId())).streamProcessorFactory(processingContext -> {
            return partitionContext.getTypedRecordProcessorsFactory().createTypedStreamProcessor(processingContext.getActor(), processingContext.getZeebeState(), processingContext);
        }).build();
    }
}
