package io.zeebe.logstreams.impl;

import io.zeebe.logstreams.impl.LogStreamImpl;
import io.zeebe.logstreams.impl.log.index.LogBlockIndex;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.logstreams.spi.ReadableSnapshot;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.logstreams.spi.SnapshotWriter;
import io.zeebe.util.allocation.AllocatedBuffer;
import io.zeebe.util.allocation.BufferAllocators;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.Position;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogBlockIndexAppender.class */
public class LogBlockIndexAppender extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    public static final float DEFAULT_DEVIATION = 0.1f;
    private Runnable currentRunnable;
    private final String name;
    private final LogStorage logStorage;
    private final LogBlockIndex blockIndex;
    private final ActorScheduler actorScheduler;
    private final int indexBlockSize;
    private final float deviation;
    private int bufferSize;
    private ByteBuffer ioBuffer;
    private AllocatedBuffer allocatedBuffer;
    private final Position commitPosition;
    private final ActorConditions onCommitPositionUpdatedConditions;
    private ActorCondition onCommitCondition;
    private final SnapshotStorage snapshotStorage;
    private final Duration snapshotInterval;
    private final Runnable runCurrentWork = this::runCurrentWork;
    private final Runnable readLogStorage = this::readLogStorage;
    private final Runnable addCurrentBlockToIndex = this::addCurrentBlockToIndex;
    private final Runnable createSnapshot = this::createSnapshot;
    private final CompleteEventsInBlockProcessor readResultProcessor = new CompleteEventsInBlockProcessor();
    private final AtomicBoolean isOpenend = new AtomicBoolean(false);
    private long nextAddress = -1;
    private int currentBlockSize = 0;
    private long currentBlockAddress = -1;
    private long currentBlockEventPosition = 0;
    private long lastBlockAddress = 0;
    private long lastBlockEventPosition = 0;
    private final UnsafeBuffer buffer = new UnsafeBuffer(0, 0);
    private long snapshotEventPosition = -1;

    public LogBlockIndexAppender(LogStreamImpl.LogStreamBuilder logStreamBuilder, Position position, ActorConditions actorConditions) {
        this.name = logStreamBuilder.getLogName() + ".index";
        this.logStorage = logStreamBuilder.getLogStorage();
        this.blockIndex = logStreamBuilder.getBlockIndex();
        this.actorScheduler = logStreamBuilder.getActorScheduler();
        this.commitPosition = position;
        this.onCommitPositionUpdatedConditions = actorConditions;
        this.deviation = logStreamBuilder.getDeviation();
        this.indexBlockSize = (int) (logStreamBuilder.getIndexBlockSize() * (1.0f - this.deviation));
        this.snapshotStorage = logStreamBuilder.getSnapshotStorage();
        this.snapshotInterval = logStreamBuilder.getSnapshotPeriod();
        this.bufferSize = logStreamBuilder.getReadBlockSize();
    }

    public String getName() {
        return this.name;
    }

    public void open() {
        openAsync().join();
    }

    public ActorFuture<Void> openAsync() {
        return this.isOpenend.compareAndSet(false, true) ? this.actorScheduler.submitActor(this, true) : CompletableActorFuture.completed((Object) null);
    }

    protected void onActorStarting() {
        this.allocatedBuffer = BufferAllocators.allocateDirect(this.bufferSize);
        this.ioBuffer = this.allocatedBuffer.getRawBuffer();
        this.buffer.wrap(this.ioBuffer);
        if (!this.logStorage.isOpen()) {
            this.logStorage.open();
        }
        recoverBlockIndex();
    }

    private void recoverBlockIndex() {
        try {
            ReadableSnapshot lastSnapshot = this.snapshotStorage.getLastSnapshot(this.name);
            if (lastSnapshot != null) {
                lastSnapshot.recoverFromSnapshot(this.blockIndex);
                long position = lastSnapshot.getPosition();
                long lookupBlockAddress = this.blockIndex.lookupBlockAddress(position);
                if (lookupBlockAddress >= this.logStorage.getFirstBlockAddress()) {
                    this.nextAddress = lookupBlockAddress;
                    this.lastBlockAddress = lookupBlockAddress;
                    this.lastBlockEventPosition = position;
                } else {
                    LOG.warn("Can't find address of snapshot position. Rebuild block index.");
                }
            }
            if (this.nextAddress == -1) {
                this.blockIndex.reset();
                this.nextAddress = this.logStorage.getFirstBlockAddress();
                this.lastBlockAddress = 0L;
            }
        } catch (Exception e) {
            this.isOpenend.set(false);
            LOG.error("Failed to recover block index.", e);
            throw new RuntimeException(e);
        }
    }

    protected void onActorStarted() {
        this.onCommitCondition = this.actor.onCondition("log-index-on-commit", this.runCurrentWork);
        this.onCommitPositionUpdatedConditions.registerConsumer(this.onCommitCondition);
        this.actor.runAtFixedRate(this.snapshotInterval, this.createSnapshot);
        if (this.nextAddress <= 0) {
            this.currentRunnable = () -> {
                this.nextAddress = this.logStorage.getFirstBlockAddress();
                this.currentRunnable = this.readLogStorage;
                runCurrentWork();
            };
        } else {
            this.currentRunnable = this.readLogStorage;
            runCurrentWork();
        }
    }

    private void runCurrentWork() {
        this.actor.submit(this.currentRunnable);
    }

    private void readLogStorage() {
        long j = this.nextAddress;
        long read = this.logStorage.read(this.ioBuffer, j, this.readResultProcessor);
        if (read > j) {
            addToCurrentBlock(j);
            this.nextAddress = read;
            runCurrentWork();
        } else if (read == -3) {
            increaseBufferSize();
            runCurrentWork();
        } else if (read == -1) {
            LOG.warn("Can't read from illegal address: {}", Long.valueOf(j));
            this.nextAddress = this.lastBlockAddress;
            resetCurrentBlock();
        }
    }

    private void addToCurrentBlock(long j) {
        if (this.currentBlockAddress == -1) {
            this.currentBlockAddress = j;
            this.currentBlockEventPosition = LogEntryDescriptor.getPosition(this.buffer, 0);
        }
        this.currentBlockSize += this.ioBuffer.position();
        if (this.currentBlockSize >= this.indexBlockSize) {
            addCurrentBlockToIndex();
        } else {
            this.ioBuffer.clear();
        }
    }

    private void addCurrentBlockToIndex() {
        if (!isCurrentBlockCommitted()) {
            this.currentRunnable = this.addCurrentBlockToIndex;
            return;
        }
        if (this.currentBlockAddress > this.lastBlockAddress) {
            LOG.trace("Add block to index with position {} and address {}.", Long.valueOf(this.currentBlockEventPosition), Long.valueOf(this.currentBlockAddress));
            this.blockIndex.addBlock(this.currentBlockEventPosition, this.currentBlockAddress);
            this.lastBlockAddress = this.currentBlockAddress;
            this.lastBlockEventPosition = this.currentBlockEventPosition;
        }
        resetCurrentBlock();
        this.currentRunnable = this.readLogStorage;
    }

    private boolean isCurrentBlockCommitted() {
        return this.commitPosition.get() >= this.readResultProcessor.getLastReadEventPosition();
    }

    private void resetCurrentBlock() {
        this.currentBlockAddress = -1L;
        this.currentBlockEventPosition = 0L;
        this.currentBlockSize = 0;
        this.ioBuffer.clear();
    }

    private void increaseBufferSize() {
        this.bufferSize *= 2;
        this.allocatedBuffer.close();
        this.allocatedBuffer = BufferAllocators.allocateDirect(this.bufferSize);
        this.ioBuffer = this.allocatedBuffer.getRawBuffer();
        this.buffer.wrap(this.ioBuffer);
    }

    private void createSnapshot() {
        SnapshotWriter snapshotWriter = null;
        try {
            if (this.lastBlockEventPosition > this.snapshotEventPosition) {
                this.logStorage.flush();
                snapshotWriter = this.snapshotStorage.createSnapshot(this.name, this.lastBlockEventPosition);
                snapshotWriter.writeSnapshot(this.blockIndex);
                snapshotWriter.commit();
                this.snapshotEventPosition = this.lastBlockEventPosition;
                LOG.trace("Created snapshot of block index {}.", this.name);
            }
        } catch (Exception e) {
            LOG.warn("Failed to create snapshot of block index {}", this.name, e);
            if (snapshotWriter != null) {
                snapshotWriter.abort();
            }
        }
    }

    public void close() {
        closeAsync().join();
    }

    public ActorFuture<Void> closeAsync() {
        return this.actor.close();
    }

    protected void onActorClosing() {
        resetCurrentBlock();
        this.allocatedBuffer.close();
        this.onCommitPositionUpdatedConditions.removeConsumer(this.onCommitCondition);
        this.onCommitCondition = null;
        this.isOpenend.set(false);
    }

    public boolean isClosed() {
        return !this.isOpenend.get();
    }

    public boolean isOpened() {
        return this.isOpenend.get();
    }
}
