package io.zeebe.logstreams.impl;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.logstreams.impl.LogStreamImpl;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.SchedulingHints;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.MutableDirectBuffer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/LogStorageAppender.class */
public class LogStorageAppender extends Actor {
    public static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private final ActorConditions onLogStorageAppendedConditions;
    private Runnable peekedBlockHandler;
    private CompletableActorFuture<Void> openFuture;
    private String name;
    private LogStorage logStorage;
    private ActorScheduler actorScheduler;
    private int maxAppendBlockSize;
    private Dispatcher writeBuffer;
    private Subscription writeBufferSubscription;
    private final AtomicBoolean isOpenend = new AtomicBoolean(false);
    private final AtomicBoolean isFailed = new AtomicBoolean(false);
    private final BlockPeek blockPeek = new BlockPeek();

    public LogStorageAppender(LogStreamImpl.LogStreamBuilder logStreamBuilder, ActorConditions actorConditions) {
        wrap(logStreamBuilder);
        this.onLogStorageAppendedConditions = actorConditions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void wrap(LogStreamImpl.LogStreamBuilder logStreamBuilder) {
        this.name = logStreamBuilder.getLogName() + ".appender";
        this.logStorage = logStreamBuilder.getLogStorage();
        this.actorScheduler = logStreamBuilder.getActorScheduler();
        this.maxAppendBlockSize = logStreamBuilder.getMaxAppendBlockSize();
        this.writeBuffer = logStreamBuilder.getWriteBuffer();
    }

    public String getName() {
        return this.name;
    }

    public void open() {
        openAsync().join();
    }

    public ActorFuture<Void> openAsync() {
        if (!this.isOpenend.compareAndSet(false, true)) {
            return CompletableActorFuture.completed((Object) null);
        }
        CompletableActorFuture<Void> completableActorFuture = new CompletableActorFuture<>();
        this.openFuture = completableActorFuture;
        this.actorScheduler.submitActor(this, true, SchedulingHints.ioBound((short) 0));
        return completableActorFuture;
    }

    protected void onActorStarted() {
        if (!this.logStorage.isOpen()) {
            this.logStorage.open();
        }
        this.actor.runOnCompletion(this.writeBuffer.getSubscriptionAsync("log-appender"), (subscription, th) -> {
            if (th != null) {
                this.openFuture.completeExceptionally(th);
                this.openFuture = null;
                return;
            }
            this.writeBufferSubscription = subscription;
            this.peekedBlockHandler = this::appendBlock;
            this.actor.consume(this.writeBufferSubscription, this::peek);
            this.openFuture.complete((Object) null);
            this.openFuture = null;
        });
    }

    private void peek() {
        if (this.writeBufferSubscription.peekBlock(this.blockPeek, this.maxAppendBlockSize, true) > 0) {
            this.peekedBlockHandler.run();
        } else {
            this.actor.yield();
        }
    }

    private void appendBlock() {
        ByteBuffer rawBuffer = this.blockPeek.getRawBuffer();
        MutableDirectBuffer buffer = this.blockPeek.getBuffer();
        if (this.logStorage.append(rawBuffer) >= 0) {
            this.blockPeek.markCompleted();
            this.onLogStorageAppendedConditions.signalConsumers();
            return;
        }
        this.isFailed.set(true);
        LOG.error("Failed to append log storage on position '{}'. Discard the following blocks.", Long.valueOf(LogEntryDescriptor.getPosition(buffer, 0)));
        this.peekedBlockHandler = this::discardBlock;
        discardBlock();
    }

    private void discardBlock() {
        this.blockPeek.markFailed();
        this.actor.yield();
    }

    public void close() {
        closeAsync().join();
    }

    public ActorFuture<Void> closeAsync() {
        return this.isOpenend.compareAndSet(true, false) ? this.actor.close() : CompletableActorFuture.completed((Object) null);
    }

    protected void onActorClosing() {
        this.isOpenend.set(false);
        this.isFailed.set(false);
    }

    public boolean isOpened() {
        return this.isOpenend.get();
    }

    public boolean isClosed() {
        return !this.isOpenend.get();
    }

    public boolean isFailed() {
        return this.isFailed.get();
    }

    public long getCurrentAppenderPosition() {
        if (this.writeBufferSubscription != null) {
            return this.writeBufferSubscription.getPosition();
        }
        return -1L;
    }
}
