package io.zeebe.logstreams.processor;

import io.zeebe.db.DbContext;
import io.zeebe.db.TransactionOperation;
import io.zeebe.db.ZeebeDbTransaction;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.util.retry.EndlessRetryStrategy;
import io.zeebe.util.retry.RetryStrategy;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/processor/ReProcessingStateMachine.class */
public final class ReProcessingStateMachine {
    private static final Logger LOG;
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{}' with processor '{}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT = "Expected to find last source event position '%d', but last position was '%d'. Failed to reprocess on processor '%s'";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT = "Expected to find last source event position '%d', but found no next event. Failed to reprocess on processor '%s'";
    private static final String LOG_STMT_REPROCESSING_FINISHED = "Processor {} finished reprocessing at event position {}";
    private static final String LOG_STMT_FAILED_ON_PROCESSING = "Event {} failed on processing last time, will call #onError to update workflow instance blacklist.";
    private final boolean isReadOnlyProcessor;
    private final int producerId;
    private final ActorControl actor;
    private final String streamProcessorName;
    private final StreamProcessor streamProcessor;
    private final EventFilter eventFilter;
    private final LogStreamReader logStreamReader;
    private final DbContext dbContext;
    private final RetryStrategy updateStateRetryStrategy;
    private final RetryStrategy processRetryStrategy;
    private final BooleanSupplier abortCondition;
    private final Set<Long> failedEventPositions;
    private long lastSourceEventPosition;
    private ActorFuture<Void> recoveryFuture;
    private LoggedEvent currentEvent;
    private EventProcessor eventProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/zeebe/logstreams/processor/ReProcessingStateMachine$ReprocessingStateMachineBuilder.class */
    public static class ReprocessingStateMachineBuilder {
        private StreamProcessorContext streamProcessorContext;
        private DbContext dbContext;
        private StreamProcessor streamProcessor;
        private BooleanSupplier abortCondition;

        public ReprocessingStateMachineBuilder setStreamProcessor(StreamProcessor streamProcessor) {
            this.streamProcessor = streamProcessor;
            return this;
        }

        public ReprocessingStateMachineBuilder setStreamProcessorContext(StreamProcessorContext streamProcessorContext) {
            this.streamProcessorContext = streamProcessorContext;
            return this;
        }

        public ReprocessingStateMachineBuilder setDbContext(DbContext dbContext) {
            this.dbContext = dbContext;
            return this;
        }

        public ReprocessingStateMachineBuilder setAbortCondition(BooleanSupplier booleanSupplier) {
            this.abortCondition = booleanSupplier;
            return this;
        }

        public ReProcessingStateMachine build() {
            Objects.requireNonNull(this.streamProcessorContext);
            Objects.requireNonNull(this.streamProcessor);
            Objects.requireNonNull(this.dbContext);
            Objects.requireNonNull(this.abortCondition);
            return new ReProcessingStateMachine(this.streamProcessorContext, this.streamProcessor, this.dbContext, this.abortCondition);
        }
    }

    public static ReprocessingStateMachineBuilder builder() {
        return new ReprocessingStateMachineBuilder();
    }

    private ReProcessingStateMachine(StreamProcessorContext streamProcessorContext, StreamProcessor streamProcessor, DbContext dbContext, BooleanSupplier booleanSupplier) {
        this.failedEventPositions = new HashSet();
        this.actor = streamProcessorContext.getActorControl();
        this.streamProcessorName = streamProcessorContext.getName();
        this.eventFilter = streamProcessorContext.getEventFilter();
        this.logStreamReader = streamProcessorContext.getLogStreamReader();
        this.isReadOnlyProcessor = streamProcessorContext.isReadOnlyProcessor();
        this.producerId = streamProcessorContext.getId();
        this.streamProcessor = streamProcessor;
        this.dbContext = dbContext;
        this.updateStateRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.processRetryStrategy = new EndlessRetryStrategy(this.actor);
        this.abortCondition = booleanSupplier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActorFuture<Void> startRecover(long j) {
        this.recoveryFuture = new CompletableActorFuture();
        long position = this.logStreamReader.getPosition();
        LOG.info("Start scanning the log for error events.");
        this.lastSourceEventPosition = scanLog(j);
        LOG.info("Finished scanning the log for error events.");
        if (this.lastSourceEventPosition > j) {
            LOG.info("Processor {} starts reprocessing, until last source event position {}", this.streamProcessorName, Long.valueOf(this.lastSourceEventPosition));
            this.logStreamReader.seek(position);
            reprocessNextEvent();
        } else {
            this.recoveryFuture.complete((Object) null);
        }
        return this.recoveryFuture;
    }

    private long scanLog(long j) {
        long j2 = -1;
        if (!this.isReadOnlyProcessor && this.logStreamReader.hasNext()) {
            j2 = j;
            while (this.logStreamReader.hasNext()) {
                LoggedEvent next = this.logStreamReader.next();
                long failedPosition = this.streamProcessor.getFailedPosition(next);
                if (failedPosition >= 0) {
                    LOG.debug("Found error-prone event {} on reprocessing, will add position {} to the blacklist.", next, Long.valueOf(failedPosition));
                    this.failedEventPositions.add(Long.valueOf(failedPosition));
                }
                if (next.getProducerId() == this.producerId) {
                    long sourceEventPosition = next.getSourceEventPosition();
                    if (sourceEventPosition > 0 && sourceEventPosition > j2) {
                        j2 = sourceEventPosition;
                    }
                }
            }
            this.logStreamReader.seek(j + 1);
        }
        return j2;
    }

    private void readNextEvent() {
        if (!this.logStreamReader.hasNext()) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_NEXT_EVENT, Long.valueOf(this.lastSourceEventPosition), this.streamProcessorName));
        }
        this.currentEvent = this.logStreamReader.next();
        if (this.currentEvent.getPosition() > this.lastSourceEventPosition) {
            throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT, Long.valueOf(this.lastSourceEventPosition), Long.valueOf(this.currentEvent.getPosition()), this.streamProcessorName));
        }
    }

    private void reprocessNextEvent() {
        try {
            readNextEvent();
            if (this.eventFilter == null || this.eventFilter.applies(this.currentEvent)) {
                reprocessEvent(this.currentEvent);
            } else {
                onRecordReprocessed(this.currentEvent);
            }
        } catch (RuntimeException e) {
            this.recoveryFuture.completeExceptionally(e);
        }
    }

    private void reprocessEvent(LoggedEvent loggedEvent) {
        try {
            this.eventProcessor = this.streamProcessor.onEvent(loggedEvent);
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, new Object[]{loggedEvent, this.streamProcessorName, e});
        }
        if (this.eventProcessor == null) {
            onRecordReprocessed(loggedEvent);
        } else {
            processUntilDone(loggedEvent);
        }
    }

    private void processUntilDone(LoggedEvent loggedEvent) {
        TransactionOperation chooseOperationForEvent = chooseOperationForEvent(loggedEvent);
        this.actor.runOnCompletion(this.processRetryStrategy.runWithRetry(() -> {
            if (this.zeebeDbTransaction != null) {
                this.zeebeDbTransaction.rollback();
            }
            this.zeebeDbTransaction = this.dbContext.getCurrentTransaction();
            this.zeebeDbTransaction.run(chooseOperationForEvent);
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            updateStateUntilDone();
        });
    }

    private TransactionOperation chooseOperationForEvent(LoggedEvent loggedEvent) {
        TransactionOperation transactionOperation;
        if (this.failedEventPositions.contains(Long.valueOf(loggedEvent.getPosition()))) {
            LOG.info(LOG_STMT_FAILED_ON_PROCESSING, loggedEvent);
            transactionOperation = () -> {
                this.eventProcessor.onError(new Exception("Failed on last processing."));
            };
        } else {
            EventProcessor eventProcessor = this.eventProcessor;
            eventProcessor.getClass();
            transactionOperation = eventProcessor::processEvent;
        }
        return transactionOperation;
    }

    private void updateStateUntilDone() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            this.zeebeDbTransaction = null;
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (!$assertionsDisabled && th != null) {
                throw new AssertionError("On reprocessing there shouldn't be any exception thrown.");
            }
            onRecordReprocessed(this.currentEvent);
        });
    }

    private void onRecordReprocessed(LoggedEvent loggedEvent) {
        if (loggedEvent.getPosition() != this.lastSourceEventPosition) {
            this.actor.submit(this::reprocessNextEvent);
        } else {
            LOG.info(LOG_STMT_REPROCESSING_FINISHED, this.streamProcessorName, Long.valueOf(loggedEvent.getPosition()));
            onRecovered();
        }
    }

    private void onRecovered() {
        this.recoveryFuture.complete((Object) null);
        this.failedEventPositions.clear();
    }

    static {
        $assertionsDisabled = !ReProcessingStateMachine.class.desiredAssertionStatus();
        LOG = Loggers.PROCESSOR_LOGGER;
    }
}
