package io.zeebe.distributedlog.restore.log;

import io.atomix.cluster.MemberId;
import io.zeebe.distributedlog.restore.impl.ControllableRestoreClient;
import io.zeebe.distributedlog.restore.log.impl.DefaultLogReplicationResponse;
import io.zeebe.distributedlog.restore.log.impl.RecordingLogReplicationAppender;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.agrona.concurrent.UnsafeBuffer;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/zeebe/distributedlog/restore/log/LogReplicatorTest.class */
public class LogReplicatorTest {
    private final ControllableRestoreClient client = new ControllableRestoreClient();
    private final RecordingLogReplicationAppender appender = new RecordingLogReplicationAppender();
    private final Executor executor = (v0) -> {
        v0.run();
    };
    private final MemberId server = MemberId.anonymous();
    private final LogReplicator replicator = new LogReplicator(this.appender, this.client, this.executor);

    @Before
    public void setUp() {
        this.client.reset();
        this.appender.reset();
    }

    @Test
    public void shouldAppendEventsOnResponse() {
        LogReplicationResponse newResponse = newResponse(false, 1L);
        this.replicator.replicate(this.server, -1L, -1L);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(this.appender.getInvocations()).hasSize(1).first().extracting(new String[]{"commitPosition", "serializedEvents"}).contains(new Object[]{Long.valueOf(newResponse.getToPosition()), newResponse.getSerializedEvents()});
    }

    @Test
    public void shouldReplicateAgain() {
        LogReplicationResponse newResponse = newResponse(true, 2L);
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, newResponse.getToPosition() + 1);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(replicate).isNotCompleted();
        Assertions.assertThat(this.client.getLogReplicationRequests()).hasSize(2);
        Assertions.assertThat(this.client.getLogReplicationRequests().get(Long.valueOf(newResponse.getToPosition()))).isNotNull().isNotCompleted();
    }

    @Test
    public void shouldExcludeFromPositionWhenReplicateAgain() {
        LogReplicationResponse newResponse = newResponse(true, 2L);
        this.replicator.replicate(this.server, -1L, newResponse.getToPosition() + 1, true);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(this.client.getLogReplicationRequests()).hasSize(2);
        Assertions.assertThat(this.client.getRequestLog().get(0).includeFromPosition()).isTrue();
        Assertions.assertThat(this.client.getRequestLog().get(1).includeFromPosition()).isFalse();
    }

    @Test
    public void shouldNotReplicateAgainIfNoMoreAvailable() {
        LogReplicationResponse newResponse = newResponse(false, 2L);
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, newResponse.getToPosition() + 1);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(replicate).isCompletedWithValue(Long.valueOf(newResponse.getToPosition()));
        Assertions.assertThat(this.appender.getInvocations()).hasSize(1);
    }

    @Test
    public void shouldNotReplicateAgainIfPositionReached() {
        LogReplicationResponse[] logReplicationResponseArr = {newResponse(true, 2L), newResponse(true, 4L)};
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, logReplicationResponseArr[1].getToPosition());
        this.client.completeLogReplication(-1L, logReplicationResponseArr[0]);
        Assertions.assertThat(replicate).isNotCompleted();
        this.client.completeLogReplication(logReplicationResponseArr[0].getToPosition(), logReplicationResponseArr[1]);
        Assertions.assertThat(replicate).isCompletedWithValue(Long.valueOf(logReplicationResponseArr[1].getToPosition()));
    }

    @Test
    public void shouldReplicateAgainIfMoreAvailable() {
        LogReplicationResponse newResponse = newResponse(false, 2L);
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, newResponse.getToPosition() + 1);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(replicate).isCompletedWithValue(Long.valueOf(newResponse.getToPosition()));
        Assertions.assertThat(this.appender.getInvocations()).hasSize(1);
    }

    @Test
    public void shouldCompleteExceptionallyOnError() {
        IllegalStateException illegalStateException = new IllegalStateException("fail");
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, -1L);
        this.client.completeLogReplication(-1L, illegalStateException);
        Assertions.assertThat(this.appender.getInvocations()).isEmpty();
        Assertions.assertThat(replicate).isCompletedExceptionally().hasFailedWithThrowableThat().isInstanceOf(IllegalStateException.class).hasMessage("fail");
    }

    @Test
    public void shouldCompleteExceptionallyIfRequestIsInvalid() {
        LogReplicationResponse newResponse = newResponse(false, -1L, new byte[0]);
        CompletableFuture replicate = this.replicator.replicate(this.server, -1L, -1L);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(this.appender.getInvocations()).isEmpty();
        Assertions.assertThat(replicate).isCompletedExceptionally().hasFailedWithThrowableThat().isInstanceOf(InvalidLogReplicationResponse.class);
    }

    @Test
    public void shouldCompleteExceptionallyIfAppenderReturnsNegativeResult() {
        LogReplicator logReplicator = new LogReplicator((j, bArr) -> {
            return -1L;
        }, this.client, this.executor);
        LogReplicationResponse newResponse = newResponse(false);
        CompletableFuture replicate = logReplicator.replicate(this.server, -1L, -1L);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(replicate).isCompletedExceptionally().hasFailedWithThrowableThat().isInstanceOf(FailedAppendException.class);
    }

    @Test
    public void shouldCompleteExceptionallyIfAppenderThrowsAnException() {
        RuntimeException runtimeException = new RuntimeException();
        LogReplicator logReplicator = new LogReplicator((j, bArr) -> {
            throw runtimeException;
        }, this.client, this.executor);
        LogReplicationResponse newResponse = newResponse(false);
        CompletableFuture replicate = logReplicator.replicate(this.server, -1L, -1L);
        this.client.completeLogReplication(-1L, newResponse);
        Assertions.assertThat(replicate).isCompletedExceptionally().hasFailedWithThrowableThat().isEqualTo(runtimeException);
    }

    private LogReplicationResponse newResponse(boolean z) {
        return newResponse(z, ThreadLocalRandom.current().nextLong(1L, Long.MAX_VALUE));
    }

    private LogReplicationResponse newResponse(boolean z, long j) {
        byte[] bArr = new byte[1024];
        ThreadLocalRandom.current().nextBytes(bArr);
        return newResponse(z, j, bArr);
    }

    private LogReplicationResponse newResponse(boolean z, long j, byte[] bArr) {
        DefaultLogReplicationResponse defaultLogReplicationResponse = new DefaultLogReplicationResponse();
        defaultLogReplicationResponse.setMoreAvailable(z);
        defaultLogReplicationResponse.setSerializedEvents(new UnsafeBuffer(bArr), 0, bArr.length);
        defaultLogReplicationResponse.setToPosition(j);
        return defaultLogReplicationResponse;
    }
}
