package io.zeebe.logstreams.impl;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.distributedlog.impl.DistributedLogstreamPartition;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogStorageAppender.class */
public class LogStorageAppender extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private byte[] bytesToAppend;
    private long commitPosition;
    private final String name;
    private final Subscription writeBufferSubscription;
    private final int maxAppendBlockSize;
    private final DistributedLogstreamPartition distributedLog;
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final BlockPeek blockPeek = new BlockPeek();
    private final Runnable peekedBlockHandler = this::appendBlock;

    public LogStorageAppender(String str, DistributedLogstreamPartition distributedLogstreamPartition, Subscription subscription, int i) {
        this.name = str;
        this.distributedLog = distributedLogstreamPartition;
        this.writeBufferSubscription = subscription;
        this.maxAppendBlockSize = i;
    }

    public String getName() {
        return this.name;
    }

    protected void onActorStarting() {
        this.actor.consume(this.writeBufferSubscription, this::peekBlock);
    }

    private void peekBlock() {
        if (this.writeBufferSubscription.peekBlock(this.blockPeek, this.maxAppendBlockSize, true) > 0) {
            this.peekedBlockHandler.run();
        } else {
            this.actor.yield();
        }
    }

    private void appendBlock() {
        ByteBuffer rawBuffer = this.blockPeek.getRawBuffer();
        this.bytesToAppend = new byte[rawBuffer.remaining()];
        rawBuffer.get(this.bytesToAppend);
        this.commitPosition = getLastEventPosition(this.bytesToAppend);
        this.actor.runUntilDone(this::tryWrite);
    }

    private void tryWrite() {
        this.distributedLog.asyncAppend(this.bytesToAppend, this.commitPosition);
        this.blockPeek.markCompleted();
        this.actor.done();
    }

    private long getLastEventPosition(byte[] bArr) {
        int i = 0;
        DirectBuffer unsafeBuffer = new UnsafeBuffer(0L, 0);
        unsafeBuffer.wrap(bArr);
        long j = -1;
        LoggedEventImpl loggedEventImpl = new LoggedEventImpl();
        int length = bArr.length;
        while (length - i > 0) {
            loggedEventImpl.wrap(unsafeBuffer, i);
            i += loggedEventImpl.getFragmentLength();
            j = loggedEventImpl.getPosition();
            length = bArr.length;
        }
        return j;
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    public boolean isFailed() {
        return this.isFailed.get();
    }

    public long getCurrentAppenderPosition() {
        return this.writeBufferSubscription.getPosition();
    }
}
