package io.zeebe.logstreams.processor;

import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.logstreams.spi.ReadableSnapshot;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.util.LangUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorPriority;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.ActorTask;
import io.zeebe.util.sched.SchedulingHints;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController.class */
public class StreamProcessorController extends Actor {
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final String ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED = "Stream processor '%s' failed to recover. Cannot find event with the snapshot position in target log stream.";
    private static final String ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT = "Stream processor '%s' failed to reprocess. Cannot find source event position: %d";
    private static final String ERROR_MESSAGE_REPROCESSING_FAILED = "Stream processor '%s' failed to reprocess event: %s";
    private static final String ERROR_MESSAGE_PROCESSING_FAILED = "Stream processor '{}' failed to process event. It stop processing further events.";
    private final StreamProcessor streamProcessor;
    private final StreamProcessorContext streamProcessorContext;
    private final LogStreamReader logStreamReader;
    private final LogStreamWriter logStreamWriter;
    private final SnapshotStorage snapshotStorage;
    private final Duration snapshotPeriod;
    private final ActorScheduler actorScheduler;
    private final EventFilter eventFilter;
    private final boolean isReadOnlyProcessor;
    private LoggedEvent currentEvent;
    private EventProcessor eventProcessor;
    private ActorCondition onCommitPositionUpdatedCondition;
    private StreamProcessorMetrics metrics;
    private final AtomicBoolean isOpened = new AtomicBoolean(false);
    private Phase phase = Phase.REPROCESSING;
    private final EventLifecycleContext eventLifecycleContext = new EventLifecycleContext();
    private final Runnable readNextEvent = this::readNextEvent;
    private long snapshotPosition = -1;
    private long lastSourceEventPosition = -1;
    private long eventPosition = -1;
    private long lastSuccessfulProcessedEventPosition = -1;
    private long lastWrittenEventPosition = -1;
    private boolean suspended = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/zeebe/logstreams/processor/StreamProcessorController$Phase.class */
    public enum Phase {
        REPROCESSING,
        PROCESSING,
        FAILED
    }

    public StreamProcessorController(StreamProcessorContext streamProcessorContext) {
        this.streamProcessorContext = streamProcessorContext;
        this.streamProcessorContext.setActorControl(this.actor);
        this.streamProcessorContext.setSuspendRunnable(this::suspend);
        this.streamProcessorContext.setResumeRunnable(this::resume);
        this.actorScheduler = streamProcessorContext.getActorScheduler();
        this.streamProcessor = streamProcessorContext.getStreamProcessor();
        this.logStreamReader = streamProcessorContext.getLogStreamReader();
        this.logStreamWriter = streamProcessorContext.getLogStreamWriter();
        this.snapshotStorage = streamProcessorContext.getSnapshotStorage();
        this.snapshotPeriod = streamProcessorContext.getSnapshotPeriod();
        this.eventFilter = streamProcessorContext.getEventFilter();
        this.isReadOnlyProcessor = streamProcessorContext.isReadOnlyProcessor();
    }

    public String getName() {
        return this.streamProcessorContext.getName();
    }

    public ActorFuture<Void> openAsync() {
        return this.isOpened.compareAndSet(false, true) ? this.actorScheduler.submitActor(this, true) : CompletableActorFuture.completed((Object) null);
    }

    protected void onActorStarting() {
        this.streamProcessor.getStateResource().reset();
        LogStream logStream = this.streamProcessorContext.getLogStream();
        this.metrics = new StreamProcessorMetrics(this.actorScheduler.getMetricsManager(), getName(), logStream.getTopicName().getStringWithoutLengthUtf8(0, logStream.getTopicName().capacity()), String.valueOf(logStream.getPartitionId()));
        this.logStreamReader.wrap(logStream);
        this.logStreamWriter.wrap(logStream);
        try {
            this.snapshotPosition = recoverFromSnapshot();
            this.lastSourceEventPosition = seekToLastSourceEvent();
            this.streamProcessor.onOpen(this.streamProcessorContext);
        } catch (Exception e) {
            onFailure();
            LangUtil.rethrowUnchecked(e);
        }
    }

    protected void onActorStarted() {
        try {
            if (this.lastSourceEventPosition > this.snapshotPosition) {
                reprocessNextEvent();
            } else {
                onRecovered();
            }
        } catch (RuntimeException e) {
            onFailure();
            throw e;
        }
    }

    private long recoverFromSnapshot() throws Exception {
        long j = -1;
        ReadableSnapshot lastSnapshot = this.snapshotStorage.getLastSnapshot(this.streamProcessorContext.getName());
        if (lastSnapshot != null) {
            lastSnapshot.recoverFromSnapshot(this.streamProcessor.getStateResource());
            j = lastSnapshot.getPosition();
            if (!this.logStreamReader.seek(j) || !this.logStreamReader.hasNext()) {
                throw new IllegalStateException(String.format(ERROR_MESSAGE_RECOVER_FROM_SNAPSHOT_FAILED, getName()));
            }
            this.logStreamReader.seek(j + 1);
        }
        return j;
    }

    private long seekToLastSourceEvent() {
        long j = -1;
        if (!this.isReadOnlyProcessor && this.logStreamReader.hasNext()) {
            j = this.snapshotPosition;
            while (this.logStreamReader.hasNext()) {
                LoggedEvent next = this.logStreamReader.next();
                if (next.getProducerId() == this.streamProcessorContext.getId()) {
                    long sourceEventPosition = next.getSourceEventPosition();
                    if (sourceEventPosition > 0 && sourceEventPosition > j) {
                        j = sourceEventPosition;
                    }
                }
            }
            this.logStreamReader.seek(this.snapshotPosition + 1);
        }
        return j;
    }

    private void reprocessNextEvent() {
        try {
            if (!this.logStreamReader.hasNext()) {
                throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT, getName(), Long.valueOf(this.lastSourceEventPosition)));
            }
            this.currentEvent = this.logStreamReader.next();
            if (this.currentEvent.getPosition() > this.lastSourceEventPosition) {
                throw new IllegalStateException(String.format(ERROR_MESSAGE_REPROCESSING_NO_SOURCE_EVENT, getName(), Long.valueOf(this.lastSourceEventPosition)));
            }
            reprocessEvent(this.currentEvent);
        } catch (RuntimeException e) {
            onFailure();
            throw e;
        }
    }

    private void reprocessEvent(LoggedEvent loggedEvent) {
        if (this.eventFilter != null && !this.eventFilter.applies(loggedEvent)) {
            onRecordReprocessed(loggedEvent);
            return;
        }
        try {
            EventProcessor onEvent = this.streamProcessor.onEvent(loggedEvent);
            if (onEvent != null) {
                this.eventLifecycleContext.reset();
                onEvent.processEvent(this.eventLifecycleContext);
                if (this.eventLifecycleContext.hasFuture()) {
                    this.actor.runOnCompletion(this.eventLifecycleContext.getFuture(), (obj, th) -> {
                        if (th == null) {
                            onEvent.updateState();
                            onRecordReprocessed(loggedEvent);
                        } else {
                            LOG.error("Exception during async reprocessing", th);
                            onFailure();
                        }
                    });
                } else {
                    onEvent.updateState();
                    onRecordReprocessed(loggedEvent);
                }
            } else {
                onRecordReprocessed(loggedEvent);
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format(ERROR_MESSAGE_REPROCESSING_FAILED, getName(), loggedEvent), e);
        }
    }

    private void onRecordReprocessed(LoggedEvent loggedEvent) {
        if (loggedEvent.getPosition() == this.lastSourceEventPosition) {
            onRecovered();
        } else {
            this.actor.submit(this::reprocessNextEvent);
        }
    }

    private void onRecovered() {
        this.phase = Phase.PROCESSING;
        this.onCommitPositionUpdatedCondition = this.actor.onCondition(getName() + "-on-commit-position-updated", this.readNextEvent);
        this.streamProcessorContext.logStream.registerOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
        this.actor.runAtFixedRate(this.snapshotPeriod, this::createSnapshot);
        this.streamProcessor.onRecovered();
        this.actor.submit(this.readNextEvent);
    }

    private void readNextEvent() {
        if (isOpened() && !isSuspended() && this.logStreamReader.hasNext() && this.eventProcessor == null) {
            this.currentEvent = this.logStreamReader.next();
            if (this.eventFilter == null || this.eventFilter.applies(this.currentEvent)) {
                processEvent(this.currentEvent);
            } else {
                this.actor.submit(this.readNextEvent);
                this.metrics.incrementEventsSkippedCount();
            }
        }
    }

    private void processEvent(LoggedEvent loggedEvent) {
        this.eventProcessor = this.streamProcessor.onEvent(loggedEvent);
        if (this.eventProcessor == null) {
            this.actor.submit(this.readNextEvent);
            this.metrics.incrementEventsSkippedCount();
            return;
        }
        try {
            this.metrics.incrementEventsProcessedCount();
            this.eventLifecycleContext.reset();
            this.eventProcessor.processEvent(this.eventLifecycleContext);
            if (this.eventLifecycleContext.hasFuture()) {
                this.actor.runOnCompletion(this.eventLifecycleContext.getFuture(), (obj, th) -> {
                    if (th == null) {
                        this.actor.runUntilDone(this::executeSideEffects);
                    } else {
                        LOG.error(ERROR_MESSAGE_PROCESSING_FAILED, getName(), th);
                        onFailure();
                    }
                });
            } else {
                this.actor.runUntilDone(this::executeSideEffects);
            }
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED, getName(), e);
            onFailure();
        }
    }

    private void executeSideEffects() {
        try {
            if (this.eventProcessor.executeSideEffects()) {
                this.actor.done();
                this.actor.runUntilDone(this::writeEvent);
            } else if (isOpened()) {
                this.actor.yield();
            } else {
                this.actor.done();
            }
        } catch (Exception e) {
            this.actor.done();
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED, getName(), e);
            onFailure();
        }
    }

    private void writeEvent() {
        try {
            this.logStreamWriter.producerId(this.streamProcessorContext.getId()).sourceRecordPosition(this.currentEvent.getPosition());
            this.eventPosition = this.eventProcessor.writeEvent(this.logStreamWriter);
            if (this.eventPosition >= 0) {
                this.actor.done();
                this.metrics.incrementEventsWrittenCount();
                updateState();
            } else if (isOpened()) {
                this.actor.yield();
            } else {
                this.actor.done();
            }
        } catch (Exception e) {
            this.actor.done();
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED, getName(), e);
            onFailure();
        }
    }

    private void updateState() {
        try {
            this.eventProcessor.updateState();
            this.lastSuccessfulProcessedEventPosition = this.currentEvent.getPosition();
            if (this.eventPosition > 0) {
                this.lastWrittenEventPosition = this.eventPosition;
            }
            this.eventProcessor = null;
            this.actor.submit(this.readNextEvent);
        } catch (Exception e) {
            LOG.error(ERROR_MESSAGE_PROCESSING_FAILED, getName(), e);
            onFailure();
        }
    }

    private void createSnapshot() {
        if (this.actor.getLifecyclePhase() != ActorTask.ActorLifecyclePhase.STARTED) {
            doCreateSnapshot();
        } else {
            this.actor.setSchedulingHints(SchedulingHints.ioBound((short) 0));
            this.actor.submit(this::doCreateSnapshot);
        }
    }

    private void doCreateSnapshot() {
        if (this.currentEvent != null) {
            long commitPosition = this.streamProcessorContext.getLogStream().getCommitPosition();
            if (!(this.lastSuccessfulProcessedEventPosition <= this.snapshotPosition) && commitPosition >= this.lastWrittenEventPosition) {
                writeSnapshot(this.lastSuccessfulProcessedEventPosition);
            }
        }
        this.actor.setSchedulingHints(SchedulingHints.cpuBound(ActorPriority.REGULAR));
    }

    private void writeSnapshot(long j) {
        SnapshotWriter snapshotWriter = null;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            String name = this.streamProcessorContext.getName();
            LOG.info("Write snapshot for stream processor {} at event position {}.", name, Long.valueOf(j));
            snapshotWriter = this.snapshotStorage.createSnapshot(name, j);
            long writeSnapshot = snapshotWriter.writeSnapshot(this.streamProcessor.getStateResource());
            snapshotWriter.commit();
            long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
            LOG.info("Creation of snapshot {} took {} ms.", name, Long.valueOf(currentTimeMillis2));
            this.metrics.recordSnapshotSize(writeSnapshot);
            this.metrics.recordSnapshotCreationTime(currentTimeMillis2);
            this.snapshotPosition = j;
        } catch (Exception e) {
            LOG.error("Stream processor '{}' failed. Can not write snapshot.", getName(), e);
            if (snapshotWriter != null) {
                snapshotWriter.abort();
            }
        }
    }

    public ActorFuture<Void> closeAsync() {
        return this.isOpened.compareAndSet(true, false) ? this.actor.close() : CompletableActorFuture.completed((Object) null);
    }

    protected void onActorClosing() {
        this.metrics.close();
        if (!isFailed()) {
            createSnapshot();
            this.streamProcessor.onClose();
        }
        this.streamProcessorContext.getLogStreamReader().close();
        this.streamProcessorContext.logStream.removeOnCommitPositionUpdatedCondition(this.onCommitPositionUpdatedCondition);
        this.onCommitPositionUpdatedCondition = null;
    }

    private void onFailure() {
        this.phase = Phase.FAILED;
        this.isOpened.set(false);
        this.actor.close();
    }

    public boolean isOpened() {
        return this.isOpened.get();
    }

    public boolean isFailed() {
        return this.phase == Phase.FAILED;
    }

    public boolean isSuspended() {
        return this.suspended;
    }

    private void suspend() {
        this.suspended = true;
    }

    private void resume() {
        this.suspended = false;
        if (this.phase == Phase.PROCESSING) {
            this.actor.submit(this.readNextEvent);
        }
    }
}
