package io.zeebe.logstreams.impl.log;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.logstreams.impl.Loggers;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamBatchWriter;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LogStreamRecordWriter;
import io.zeebe.logstreams.log.LogStreamWriter;
import io.zeebe.logstreams.spi.LogStorage;
import io.zeebe.util.health.FailureListener;
import io.zeebe.util.health.HealthStatus;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.channel.ActorConditions;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/logstreams/impl/log/LogStreamImpl.class */
public final class LogStreamImpl extends Actor implements LogStream, FailureListener {
    private static final long INVALID_ADDRESS = -1;
    private static final Logger LOG = Loggers.LOGSTREAMS_LOGGER;
    private static final String APPENDER_SUBSCRIPTION_NAME = "appender";
    private final ActorConditions onCommitPositionUpdatedConditions;
    private final String logName;
    private final int partitionId;
    private final int maxFrameLength;
    private final ActorScheduler actorScheduler;
    private final List<LogStreamReader> readers;
    private final LogStreamReaderImpl reader;
    private final LogStorage logStorage;
    private final CompletableActorFuture<Void> closeFuture = new CompletableActorFuture<>();
    private final int nodeId;
    private ActorFuture<LogStorageAppender> appenderFuture;
    private Dispatcher writeBuffer;
    private LogStorageAppender appender;
    private long commitPosition;
    private Throwable closeError;
    private final String actorName;
    private FailureListener failureListener;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:io/zeebe/logstreams/impl/log/LogStreamImpl$WriterCreator.class */
    public interface WriterCreator<T extends LogStreamWriter> {
        T create(int i, Dispatcher dispatcher);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogStreamImpl(ActorScheduler actorScheduler, ActorConditions actorConditions, String str, int i, int i2, int i3, LogStorage logStorage) {
        this.actorScheduler = actorScheduler;
        this.onCommitPositionUpdatedConditions = actorConditions;
        this.logName = str;
        this.partitionId = i;
        this.nodeId = i2;
        this.actorName = buildActorName(i2, "LogStream-" + i);
        this.maxFrameLength = i3;
        this.logStorage = logStorage;
        try {
            logStorage.open();
            this.commitPosition = -1L;
            this.readers = new ArrayList();
            this.reader = new LogStreamReaderImpl(logStorage);
            this.readers.add(this.reader);
            internalSetCommitPosition(this.reader.seekToEnd());
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public int getPartitionId() {
        return this.partitionId;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public String getLogName() {
        return this.logName;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<Long> getCommitPositionAsync() {
        return this.actor.call(() -> {
            return Long.valueOf(this.commitPosition);
        });
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void setCommitPosition(long j) {
        this.actor.call(() -> {
            internalSetCommitPosition(j);
        });
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStreamReader> newLogStreamReader() {
        return this.actor.call(() -> {
            LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(this.logStorage);
            this.readers.add(logStreamReaderImpl);
            return logStreamReaderImpl;
        });
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStreamRecordWriter> newLogStreamRecordWriter() {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
        }
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.run(() -> {
            createWriter(completableActorFuture, LogStreamWriterImpl::new);
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public ActorFuture<LogStreamBatchWriter> newLogStreamBatchWriter() {
        if (this.actor.isClosed()) {
            return CompletableActorFuture.completedExceptionally(new RuntimeException("Actor is closed"));
        }
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.run(() -> {
            createWriter(completableActorFuture, LogStreamBatchWriterImpl::new);
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void registerOnCommitPositionUpdatedCondition(ActorCondition actorCondition) {
        this.actor.call(() -> {
            this.onCommitPositionUpdatedConditions.registerConsumer(actorCondition);
        });
    }

    @Override // io.zeebe.logstreams.log.LogStream
    public void removeOnCommitPositionUpdatedCondition(ActorCondition actorCondition) {
        this.actor.call(() -> {
            this.onCommitPositionUpdatedConditions.removeConsumer(actorCondition);
        });
    }

    public String getName() {
        return this.actorName;
    }

    protected void onActorClosing() {
        LOG.info("On closing logstream {} close {} readers", this.logName, Integer.valueOf(this.readers.size()));
        this.readers.forEach((v0) -> {
            v0.close();
        });
        LOG.info("Close log storage with name {}", this.logName);
        this.logStorage.close();
    }

    protected void onActorClosed() {
        if (this.closeError != null) {
            this.closeFuture.completeExceptionally(this.closeError);
        } else {
            this.closeFuture.complete((Object) null);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        closeAsync().join();
    }

    public ActorFuture<Void> closeAsync() {
        if (this.actor.isClosed()) {
            return this.closeFuture;
        }
        this.actor.run(() -> {
            closeAppender().onComplete((r4, th) -> {
                this.closeError = th;
                this.actor.close();
            });
        });
        return this.closeFuture;
    }

    private void internalSetCommitPosition(long j) {
        if (j > this.commitPosition) {
            this.commitPosition = j;
            this.onCommitPositionUpdatedConditions.signalConsumers();
        }
    }

    private <T extends LogStreamWriter> void createWriter(CompletableActorFuture<T> completableActorFuture, WriterCreator<T> writerCreator) {
        if (this.appender != null) {
            completableActorFuture.complete(writerCreator.create(this.partitionId, this.writeBuffer));
        } else if (this.appenderFuture != null) {
            this.appenderFuture.onComplete(onOpenAppender(completableActorFuture, writerCreator));
        } else {
            openAppender().onComplete(onOpenAppender(completableActorFuture, writerCreator));
        }
    }

    private <T extends LogStreamWriter> BiConsumer<LogStorageAppender, Throwable> onOpenAppender(CompletableActorFuture<T> completableActorFuture, WriterCreator<T> writerCreator) {
        return (logStorageAppender, th) -> {
            if (th == null) {
                completableActorFuture.complete(writerCreator.create(this.partitionId, this.writeBuffer));
            } else {
                completableActorFuture.completeExceptionally(th);
            }
        };
    }

    private ActorFuture<Void> closeAppender() {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        if (this.appender == null) {
            completableActorFuture.complete((Object) null);
            return completableActorFuture;
        }
        this.appenderFuture = null;
        LOG.info("Close appender for log stream {}", this.logName);
        LogStorageAppender logStorageAppender = this.appender;
        Dispatcher dispatcher = this.writeBuffer;
        this.appender = null;
        this.writeBuffer = null;
        logStorageAppender.closeAsync().onComplete((r5, th) -> {
            if (th == null) {
                dispatcher.closeAsync().onComplete(completableActorFuture);
            } else {
                completableActorFuture.completeExceptionally(th);
            }
        });
        return completableActorFuture;
    }

    private ActorFuture<LogStorageAppender> openAppender() {
        long j;
        if (this.appenderFuture != null) {
            return this.appenderFuture;
        }
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.appenderFuture = completableActorFuture;
        long lastPosition = getLastPosition();
        if (lastPosition > 0) {
            internalSetCommitPosition(lastPosition);
            j = lastPosition + 1;
        } else {
            j = 1;
        }
        this.writeBuffer = Dispatchers.create(buildActorName(this.nodeId, "dispatcher-" + this.partitionId)).maxFragmentLength(this.maxFrameLength).initialPosition(j).name(this.logName + "-write-buffer").actorScheduler(this.actorScheduler).build();
        this.writeBuffer.openSubscriptionAsync(APPENDER_SUBSCRIPTION_NAME).onComplete((subscription, th) -> {
            if (th != null) {
                onOpenAppenderFailed(th);
            } else {
                this.appender = new LogStorageAppender(buildActorName(this.nodeId, "LogAppender-" + this.partitionId), this.partitionId, this.logStorage, subscription, this.maxFrameLength, this::setCommitPosition);
                this.actorScheduler.submitActor(this.appender).onComplete((r4, th) -> {
                    if (th != null) {
                        onOpenAppenderFailed(th);
                    } else {
                        this.appenderFuture.complete(this.appender);
                        this.appender.addFailureListener(this);
                    }
                });
            }
        });
        return completableActorFuture;
    }

    private void onOpenAppenderFailed(Throwable th) {
        LOG.error("Unexpected error when opening appender", th);
        this.appenderFuture.completeExceptionally(th);
        onFailure();
    }

    private long getLastPosition() {
        LogStreamReaderImpl logStreamReaderImpl = new LogStreamReaderImpl(this.logStorage);
        try {
            long seekToEnd = logStreamReaderImpl.seekToEnd();
            logStreamReaderImpl.close();
            return seekToEnd;
        } catch (Throwable th) {
            try {
                logStreamReaderImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public HealthStatus getHealthStatus() {
        return this.actor.isClosed() ? HealthStatus.UNHEALTHY : HealthStatus.HEALTHY;
    }

    public void addFailureListener(FailureListener failureListener) {
        this.actor.run(() -> {
            this.failureListener = failureListener;
        });
    }

    public void onFailure() {
        this.actor.run(() -> {
            if (this.failureListener != null) {
                this.failureListener.onFailure();
            }
            closeAsync();
        });
    }

    public void onRecovered() {
        this.actor.run(() -> {
            if (this.failureListener != null) {
                this.failureListener.onRecovered();
            }
        });
    }
}
