package io.zeebe.logstreams.processor;

import io.zeebe.db.DbContext;
import io.zeebe.db.ZeebeDbTransaction;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.util.exception.RecoverableException;
import io.zeebe.util.retry.AbortableRetryStrategy;
import io.zeebe.util.retry.RecoverableRetryStrategy;
import io.zeebe.util.retry.RetryStrategy;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;
import java.util.Objects;
import java.util.function.BooleanSupplier;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/processor/ProcessingStateMachine.class */
public final class ProcessingStateMachine {
    public static final String ERROR_MESSAGE_WRITE_EVENT_ABORTED = "Expected to write one or more follow up events for event '{}' without errors, but exception was thrown.";
    private static final String ERROR_MESSAGE_ROLLBACK_ABORTED = "Expected to roll back the current transaction for event '{}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED = "Expected to execute side effects for event '{}' successfully, but exception was thrown.";
    private static final String ERROR_MESSAGE_UPDATE_STATE_FAILED = "Expected to successfully update state for event '{}' with processor '{}', but caught an exception. Retry.";
    private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT = "Expected to find event processor for event '{}' with processor '{}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT = "Expected to successfully process event '{}' with processor '{}', but caught an exception. Skip this event.";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING = "Expected to process event '{}' successfully on stream processor '{}', but caught recoverable exception. Retry processing.";
    private static final String LOG_ERROR_EVENT_COMMITTED = "Error event was committed, we continue with processing.";
    private static final String LOG_ERROR_EVENT_WRITTEN = "Error record was written at {}, we will continue with processing if event was committed. Current commit position is {}.";
    private final ActorControl actor;
    private final int producerId;
    private final String streamProcessorName;
    private final StreamProcessorMetrics metrics;
    private final StreamProcessor streamProcessor;
    private final EventFilter eventFilter;
    private final LogStream logStream;
    private final LogStreamReader logStreamReader;
    private final LogStreamRecordWriter logStreamWriter;
    private final DbContext dbContext;
    private final RetryStrategy writeRetryStrategy;
    private final RetryStrategy sideEffectsRetryStrategy;
    private final RetryStrategy updateStateRetryStrategy;
    private final BooleanSupplier shouldProcessNext;
    private final BooleanSupplier abortCondition;
    private LoggedEvent currentEvent;
    private EventProcessor eventProcessor;
    private ZeebeDbTransaction zeebeDbTransaction;
    private long eventPosition;
    private long lastSuccessfulProcessedEventPosition;
    private long lastWrittenEventPosition;
    private boolean onErrorHandling;
    private long errorRecordPosition;
    private static final Logger LOG = Loggers.PROCESSOR_LOGGER;
    private static final Duration PROCESSING_RETRY_DELAY = Duration.ofMillis(250);

    /* loaded from: input_file:io/zeebe/logstreams/processor/ProcessingStateMachine$ProcessingStateMachineBuilder.class */
    public static class ProcessingStateMachineBuilder {
        private StreamProcessorMetrics metrics;
        private StreamProcessor streamProcessor;
        private StreamProcessorContext streamProcessorContext;
        private DbContext dbContext;
        private BooleanSupplier shouldProcessNext;
        private BooleanSupplier abortCondition;

        public ProcessingStateMachineBuilder setMetrics(StreamProcessorMetrics streamProcessorMetrics) {
            this.metrics = streamProcessorMetrics;
            return this;
        }

        public ProcessingStateMachineBuilder setStreamProcessor(StreamProcessor streamProcessor) {
            this.streamProcessor = streamProcessor;
            return this;
        }

        public ProcessingStateMachineBuilder setStreamProcessorContext(StreamProcessorContext streamProcessorContext) {
            this.streamProcessorContext = streamProcessorContext;
            return this;
        }

        public ProcessingStateMachineBuilder setDbContext(DbContext dbContext) {
            this.dbContext = dbContext;
            return this;
        }

        public ProcessingStateMachineBuilder setShouldProcessNext(BooleanSupplier booleanSupplier) {
            this.shouldProcessNext = booleanSupplier;
            return this;
        }

        public ProcessingStateMachineBuilder setAbortCondition(BooleanSupplier booleanSupplier) {
            this.abortCondition = booleanSupplier;
            return this;
        }

        public ProcessingStateMachine build() {
            Objects.requireNonNull(this.streamProcessorContext);
            Objects.requireNonNull(this.metrics);
            Objects.requireNonNull(this.streamProcessor);
            Objects.requireNonNull(this.dbContext);
            Objects.requireNonNull(this.shouldProcessNext);
            Objects.requireNonNull(this.abortCondition);
            return new ProcessingStateMachine(this.streamProcessorContext, this.metrics, this.streamProcessor, this.dbContext, this.shouldProcessNext, this.abortCondition);
        }
    }

    public static ProcessingStateMachineBuilder builder() {
        return new ProcessingStateMachineBuilder();
    }

    private ProcessingStateMachine(StreamProcessorContext streamProcessorContext, StreamProcessorMetrics streamProcessorMetrics, StreamProcessor streamProcessor, DbContext dbContext, BooleanSupplier booleanSupplier, BooleanSupplier booleanSupplier2) {
        this.eventPosition = -1L;
        this.lastSuccessfulProcessedEventPosition = -1L;
        this.lastWrittenEventPosition = -1L;
        this.errorRecordPosition = -1L;
        this.actor = streamProcessorContext.getActorControl();
        this.producerId = streamProcessorContext.getId();
        this.streamProcessorName = streamProcessorContext.getName();
        this.eventFilter = streamProcessorContext.getEventFilter();
        this.logStreamReader = streamProcessorContext.getLogStreamReader();
        this.logStreamWriter = streamProcessorContext.logStreamWriter;
        this.logStream = streamProcessorContext.getLogStream();
        this.metrics = streamProcessorMetrics;
        this.streamProcessor = streamProcessor;
        this.dbContext = dbContext;
        this.writeRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.sideEffectsRetryStrategy = new AbortableRetryStrategy(this.actor);
        this.updateStateRetryStrategy = new RecoverableRetryStrategy(this.actor);
        this.shouldProcessNext = booleanSupplier;
        this.abortCondition = booleanSupplier2;
    }

    private void skipRecord() {
        this.actor.submit(this::readNextEvent);
        this.metrics.incrementEventsSkippedCount();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readNextEvent() {
        if (this.shouldProcessNext.getAsBoolean() && this.logStreamReader.hasNext() && this.eventProcessor == null && this.logStream.getCommitPosition() >= this.errorRecordPosition) {
            if (this.onErrorHandling) {
                LOG.info(LOG_ERROR_EVENT_COMMITTED);
                this.onErrorHandling = false;
            }
            this.currentEvent = this.logStreamReader.next();
            if (this.eventFilter == null || this.eventFilter.applies(this.currentEvent)) {
                processEvent(this.currentEvent);
            } else {
                skipRecord();
            }
        }
    }

    private void processEvent(LoggedEvent loggedEvent) {
        try {
            this.eventProcessor = this.streamProcessor.onEvent(loggedEvent);
            if (this.eventProcessor == null) {
                skipRecord();
                return;
            }
            try {
                this.zeebeDbTransaction = this.dbContext.getCurrentTransaction();
                ZeebeDbTransaction zeebeDbTransaction = this.zeebeDbTransaction;
                EventProcessor eventProcessor = this.eventProcessor;
                eventProcessor.getClass();
                zeebeDbTransaction.run(eventProcessor::processEvent);
                this.metrics.incrementEventsProcessedCount();
                writeEvent();
            } catch (Exception e) {
                LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, new Object[]{loggedEvent, this.streamProcessorName, e});
                onError(e, this::writeEvent);
            } catch (RecoverableException e2) {
                LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING, new Object[]{loggedEvent, this.streamProcessorName, e2});
                this.actor.runDelayed(PROCESSING_RETRY_DELAY, () -> {
                    processEvent(this.currentEvent);
                });
            }
        } catch (Exception e3) {
            LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, new Object[]{loggedEvent, this.streamProcessorName, e3});
            skipRecord();
        }
    }

    private void onError(Throwable th, Runnable runnable) {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.rollback();
            return true;
        }, this.abortCondition), (bool, th2) -> {
            if (th2 != null) {
                LOG.error(ERROR_MESSAGE_ROLLBACK_ABORTED, this.currentEvent, th2);
            }
            try {
                this.zeebeDbTransaction = this.dbContext.getCurrentTransaction();
                this.zeebeDbTransaction.run(() -> {
                    this.eventProcessor.onError(th);
                });
                this.onErrorHandling = true;
                runnable.run();
            } catch (Exception e) {
                onError(e, runnable);
            }
        });
    }

    private void writeEvent() {
        this.logStreamWriter.producerId(this.producerId).sourceRecordPosition(this.currentEvent.getPosition());
        this.actor.runOnCompletion(this.writeRetryStrategy.runWithRetry(() -> {
            this.eventPosition = this.eventProcessor.writeEvent(this.logStreamWriter);
            return this.eventPosition >= 0;
        }, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_WRITE_EVENT_ABORTED, this.currentEvent, th);
                onError(th, this::writeEvent);
            } else {
                this.metrics.incrementEventsWrittenCount();
                updateState();
            }
        });
    }

    private void updateState() {
        this.actor.runOnCompletion(this.updateStateRetryStrategy.runWithRetry(() -> {
            this.zeebeDbTransaction.commit();
            return true;
        }, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_UPDATE_STATE_FAILED, new Object[]{this.currentEvent, this.streamProcessorName, th});
                onError(th, this::updateState);
                return;
            }
            if (this.onErrorHandling) {
                this.errorRecordPosition = this.eventPosition;
                LOG.info(LOG_ERROR_EVENT_WRITTEN, Long.valueOf(this.errorRecordPosition), Long.valueOf(this.logStream.getCommitPosition()));
            }
            this.lastSuccessfulProcessedEventPosition = this.currentEvent.getPosition();
            this.lastWrittenEventPosition = this.eventPosition;
            executeSideEffects();
        });
    }

    private void executeSideEffects() {
        RetryStrategy retryStrategy = this.sideEffectsRetryStrategy;
        EventProcessor eventProcessor = this.eventProcessor;
        eventProcessor.getClass();
        this.actor.runOnCompletion(retryStrategy.runWithRetry(eventProcessor::executeSideEffects, this.abortCondition), (bool, th) -> {
            if (th != null) {
                LOG.error(ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED, this.currentEvent, th);
            }
            this.eventProcessor = null;
            this.actor.submit(this::readNextEvent);
        });
    }

    public long getLastSuccessfulProcessedEventPosition() {
        return this.lastSuccessfulProcessedEventPosition;
    }

    public long getLastWrittenEventPosition() {
        return this.lastWrittenEventPosition;
    }
}
