package io.zeebe.logstreams.impl;

import io.zeebe.logstreams.impl.log.index.LogBlockIndex;
import io.zeebe.logstreams.impl.log.index.LogBlockIndexContext;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.util.allocation.AllocatedBuffer;
import io.zeebe.util.allocation.BufferAllocators;
import io.zeebe.util.metrics.Metric;
import io.zeebe.util.metrics.MetricsManager;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import java.nio.ByteBuffer;
import java.time.Duration;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.Position;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexWriter.class */
public class LogBlockIndexWriter extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    public static final float DEFAULT_DEVIATION = 0.1f;
    private Runnable currentRunnable;
    private final String name;
    private final LogStorage logStorage;
    private final LogBlockIndex blockIndex;
    private final LogBlockIndexContext indexContext;
    private final MetricsManager metricsManager;
    private final int indexBlockSize;
    private final float deviation;
    private int bufferSize;
    private ByteBuffer ioBuffer;
    private AllocatedBuffer allocatedBuffer;
    private final Position commitPosition;
    private final ActorConditions onCommitPositionUpdatedConditions;
    private ActorCondition onCommitCondition;
    private final Duration snapshotInterval;
    private Metric snapshotsCreated;
    private final Runnable runCurrentWork = this::runCurrentWork;
    private final Runnable readLogStorage = this::readLogStorage;
    private final Runnable addCurrentBlockToIndex = this::addCurrentBlockToIndex;
    private final Runnable createSnapshot = this::createSnapshot;
    private final CompleteEventsInBlockProcessor completeEventsProcessor = new CompleteEventsInBlockProcessor();
    private long nextAddress = -1;
    private int currentBlockSize = 0;
    private long currentBlockAddress = -1;
    private long currentBlockEventPosition = 0;
    private long lastBlockAddress = 0;
    private long lastBlockEventPosition = 0;
    private final UnsafeBuffer buffer = new UnsafeBuffer(0, 0);
    private long snapshotEventPosition = -1;

    public LogBlockIndexWriter(String str, LogStreamBuilder logStreamBuilder, LogStorage logStorage, LogBlockIndex logBlockIndex, MetricsManager metricsManager) {
        this.name = str;
        this.logStorage = logStorage;
        this.blockIndex = logBlockIndex;
        this.metricsManager = metricsManager;
        this.commitPosition = logStreamBuilder.getCommitPosition();
        this.onCommitPositionUpdatedConditions = logStreamBuilder.getOnCommitPositionUpdatedConditions();
        this.deviation = logStreamBuilder.getDeviation();
        this.indexBlockSize = (int) (logStreamBuilder.getIndexBlockSize() * (1.0f - this.deviation));
        this.snapshotInterval = logStreamBuilder.getSnapshotPeriod();
        this.bufferSize = logStreamBuilder.getReadBlockSize();
        this.allocatedBuffer = BufferAllocators.allocateDirect(this.bufferSize);
        this.ioBuffer = this.allocatedBuffer.getRawBuffer();
        this.buffer.wrap(this.ioBuffer);
        this.indexContext = logBlockIndex.createLogBlockIndexContext();
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.snapshotsCreated = this.metricsManager.newMetric("logstream_blockidx_snapshots").type("counter").label("logName", getName()).create();
        try {
            long lastPosition = this.blockIndex.getLastPosition();
            long lookupBlockAddress = this.blockIndex.lookupBlockAddress(this.indexContext, lastPosition);
            if (lookupBlockAddress >= this.logStorage.getFirstBlockAddress()) {
                this.nextAddress = lookupBlockAddress;
                this.lastBlockAddress = lookupBlockAddress;
                this.lastBlockEventPosition = lastPosition;
                this.snapshotEventPosition = lastPosition;
            } else {
                LOG.warn("Can't find address of snapshot position. Rebuilding block index.");
            }
            if (this.nextAddress == -1) {
                this.nextAddress = this.logStorage.getFirstBlockAddress();
                this.lastBlockAddress = 0L;
            }
        } catch (Exception e) {
            LOG.error("Failed to recover block index", e);
            throw new RuntimeException("Failed to recover block index", e);
        }
    }

    protected void onActorStarted() {
        this.onCommitCondition = this.actor.onCondition("log-index-on-commit", this.runCurrentWork);
        this.onCommitPositionUpdatedConditions.registerConsumer(this.onCommitCondition);
        this.actor.runAtFixedRate(this.snapshotInterval, this.createSnapshot);
        if (this.nextAddress <= 0) {
            this.currentRunnable = () -> {
                this.nextAddress = this.logStorage.getFirstBlockAddress();
                this.currentRunnable = this.readLogStorage;
                runCurrentWork();
            };
        } else {
            this.currentRunnable = this.readLogStorage;
            runCurrentWork();
        }
    }

    private void runCurrentWork() {
        this.actor.submit(this.currentRunnable);
    }

    private void readLogStorage() {
        this.ioBuffer.clear();
        long j = this.nextAddress;
        long read = this.logStorage.read(this.ioBuffer, j, this.completeEventsProcessor);
        if (read > j) {
            this.nextAddress = read;
            addToCurrentBlock(j, this.ioBuffer.position());
        } else if (read == -3) {
            increaseBufferSize();
            runCurrentWork();
        } else if (read == -1) {
            LOG.warn("Can't read from illegal address: {}", Long.valueOf(j));
            this.nextAddress = this.lastBlockAddress;
            resetCurrentBlock();
        }
    }

    private void addToCurrentBlock(long j, int i) {
        if (this.currentBlockAddress == -1) {
            this.currentBlockAddress = j;
            this.currentBlockEventPosition = LogEntryDescriptor.getPosition(this.buffer, 0);
        }
        this.currentBlockSize += i;
        if (this.currentBlockSize >= this.indexBlockSize) {
            addCurrentBlockToIndex();
        } else {
            runCurrentWork();
        }
    }

    private void addCurrentBlockToIndex() {
        if (isCurrentBlockCommitted()) {
            if (this.currentBlockAddress > this.lastBlockAddress) {
                LOG.trace("Add block to index with position {} and address {}.", Long.valueOf(this.currentBlockEventPosition), Long.valueOf(this.currentBlockAddress));
                this.blockIndex.addBlock(this.indexContext, this.currentBlockEventPosition, this.currentBlockAddress);
                this.lastBlockAddress = this.currentBlockAddress;
                this.lastBlockEventPosition = this.currentBlockEventPosition;
            }
            resetCurrentBlock();
            this.currentRunnable = this.readLogStorage;
        } else {
            this.currentRunnable = this.addCurrentBlockToIndex;
        }
        runCurrentWork();
    }

    private boolean isCurrentBlockCommitted() {
        return this.commitPosition.getVolatile() >= this.completeEventsProcessor.getLastReadEventPosition();
    }

    private void resetCurrentBlock() {
        this.currentBlockAddress = -1L;
        this.currentBlockEventPosition = 0L;
        this.currentBlockSize = 0;
    }

    private void increaseBufferSize() {
        this.bufferSize *= 2;
        this.allocatedBuffer.close();
        this.allocatedBuffer = BufferAllocators.allocateDirect(this.bufferSize);
        this.ioBuffer = this.allocatedBuffer.getRawBuffer();
        this.buffer.wrap(this.ioBuffer);
    }

    private void createSnapshot() {
        try {
            if (this.lastBlockEventPosition > 0 && this.lastBlockEventPosition > this.snapshotEventPosition) {
                this.logStorage.flush();
                this.snapshotEventPosition = this.lastBlockEventPosition;
                this.blockIndex.writeSnapshot(this.snapshotEventPosition);
                LOG.trace("Created snapshot of block index {}.", this.name);
                this.snapshotsCreated.incrementOrdered();
            }
        } catch (Exception e) {
            LOG.warn("Failed to create snapshot of block index {}", this.name, e);
        }
    }

    public ActorFuture<Void> closeAsync() {
        return this.actor.close();
    }

    protected void onActorClosing() {
        resetCurrentBlock();
        this.allocatedBuffer.close();
        this.onCommitPositionUpdatedConditions.removeConsumer(this.onCommitCondition);
        this.snapshotsCreated.close();
    }

    public Metric getSnapshotsCreated() {
        return this.snapshotsCreated;
    }
}
