package io.zeebe.broker.clustering.base.snapshots;

import io.zeebe.broker.clustering.api.ErrorResponse;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkRequest;
import io.zeebe.broker.clustering.api.FetchSnapshotChunkResponse;
import io.zeebe.broker.clustering.api.ListSnapshotsRequest;
import io.zeebe.broker.clustering.api.ListSnapshotsResponse;
import io.zeebe.broker.clustering.base.ClusterBaseLayerServiceNames;
import io.zeebe.broker.clustering.base.partitions.Partition;
import io.zeebe.broker.clustering.base.topology.NodeInfo;
import io.zeebe.broker.clustering.base.topology.PartitionInfo;
import io.zeebe.broker.exporter.util.TestJarExporter;
import io.zeebe.broker.logstreams.processor.TypedStreamProcessorTest;
import io.zeebe.broker.system.ConfigurationTest;
import io.zeebe.broker.util.BufferingClientOutput;
import io.zeebe.broker.util.ControlledTopologyManager;
import io.zeebe.clustering.management.ErrorResponseCode;
import io.zeebe.logstreams.impl.snapshot.fs.FsReadableSnapshot;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotMetadata;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotStorage;
import io.zeebe.logstreams.impl.snapshot.fs.FsSnapshotStorageConfiguration;
import io.zeebe.logstreams.spi.SnapshotMetadata;
import io.zeebe.logstreams.spi.SnapshotStorage;
import io.zeebe.raft.state.RaftState;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.testing.ServiceContainerRule;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.StringUtil;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.buffer.BufferWriter;
import io.zeebe.util.sched.testing.ControlledActorSchedulerRule;
import java.security.MessageDigest;
import java.time.Duration;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:io/zeebe/broker/clustering/base/snapshots/SnapshotReplicationServiceTest.class */
public class SnapshotReplicationServiceTest {
    private static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofSeconds(5);
    private FsSnapshotStorage storage;
    private Partition partition;
    private TemporaryFolder tempFolder = new TemporaryFolder();
    private ControlledActorSchedulerRule actorSchedulerRule = new ControlledActorSchedulerRule();
    private ServiceContainerRule serviceContainerRule = new ServiceContainerRule(this.actorSchedulerRule);
    private final Duration snapshotPollInterval = Duration.ofSeconds(1);

    @Rule
    public RuleChain ruleChain = RuleChain.outerRule(this.tempFolder).around(this.actorSchedulerRule).around(this.serviceContainerRule);
    private final SnapshotReplicationService service = new SnapshotReplicationService(this.snapshotPollInterval);
    private final ControlledTopologyManager topologyManager = (ControlledTopologyManager) Mockito.spy(new ControlledTopologyManager());
    private final BufferingClientOutput output = new BufferingClientOutput(DEFAULT_REQUEST_TIMEOUT);
    private final ClientTransport transport = createTransport();
    private final FsSnapshotStorageConfiguration config = new FsSnapshotStorageConfiguration();
    private final NodeInfo leaderNodeInfo = createLeaderNodeInfo(0);

    @Before
    public void setUp() throws Exception {
        this.tempFolder.create();
        this.storage = createSnapshotStorage();
        this.partition = createPartition();
        this.service.getTopologyManagerInjector().inject(this.topologyManager);
        this.service.getManagementClientApiInjector().inject(this.transport);
        this.service.getPartitionInjector().inject(this.partition);
        this.topologyManager.setPartitionLeader(this.partition, this.leaderNodeInfo);
    }

    @Test
    public void shouldRetryIfLeaderNotInTopologyYet() {
        this.topologyManager.getTopology().removeMember(this.leaderNodeInfo);
        installService();
        Assertions.assertThat(this.output.getSentRequests()).isEmpty();
        this.topologyManager.setPartitionLeader(this.partition, this.leaderNodeInfo);
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getRequest()).isInstanceOf(ListSnapshotsRequest.class);
        Assertions.assertThat(this.output.getLastRequest().getRequest().getPartitionId()).isEqualTo(this.partition.getInfo().getPartitionId());
    }

    @Test
    public void shouldRetryOnTopologyError() {
        this.topologyManager.setQueryError(new RuntimeException("fail"));
        installService();
        Assertions.assertThat(this.output.getSentRequests()).isEmpty();
        this.topologyManager.setQueryError(null);
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getRequest()).isInstanceOf(ListSnapshotsRequest.class);
        Assertions.assertThat(this.output.getLastRequest().getRequest().getPartitionId()).isEqualTo(this.partition.getInfo().getPartitionId());
    }

    @Test
    public void shouldUpdateTopologyOnTransportError() {
        installService();
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
        this.topologyManager.setPartitionLeader(this.partition, createNodeInfo(1, "0.0.0.0", 5));
        this.output.getLastRequest().respondWith(new RuntimeException("network error"));
        this.actorSchedulerRule.workUntilDone();
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(2);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
    }

    @Test
    public void shouldRetryIfListSnapshotsFails() {
        BufferWriter data = new ErrorResponse().setCode(ErrorResponseCode.PARTITION_NOT_FOUND).setData("fail");
        installService();
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
        this.output.getLastRequest().respondWith(data);
        this.actorSchedulerRule.workUntilDone();
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(2);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
    }

    @Test
    public void shouldRefreshLeaderAddressOnListSnapshotsError() {
        BufferWriter data = new ErrorResponse().setCode(ErrorResponseCode.PARTITION_NOT_FOUND).setData("fail");
        installService();
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
        this.topologyManager.setPartitionLeader(this.partition, createNodeInfo(1, "0.0.0.0", 12345));
        this.output.getLastRequest().respondWith(data);
        this.actorSchedulerRule.workUntilDone();
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(2);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
    }

    @Test
    public void shouldAbortCurrentSnapshotIfOneChunkFailsAndMoveToNextOne() throws Exception {
        BufferWriter data = new ErrorResponse().setCode(ErrorResponseCode.READ_ERROR).setData("could not read");
        SnapshotMetadata[] snapshotMetadataArr = {createSnapshot(TypedStreamProcessorTest.STREAM_NAME, 3L, TypedStreamProcessorTest.STREAM_NAME), createSnapshot(TestJarExporter.FOO, 6L, TestJarExporter.FOO)};
        installService();
        Assertions.assertThat(this.output.getSentRequests().size()).isEqualTo(1);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
        this.output.getLastRequest().respondWith((BufferWriter) generateListSnapshotsResponse(snapshotMetadataArr));
        this.actorSchedulerRule.workUntilDone();
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(8);
        assertFetchingSnapshot(snapshotMetadataArr[0]);
        this.output.getLastRequest().respondWith(data);
        this.actorSchedulerRule.workUntilDone();
        Assertions.assertThat(this.storage.listSnapshots()).isEmpty();
        this.actorSchedulerRule.waitForTimer(SnapshotReplicationService.ERROR_RETRY_INTERVAL);
        assertFetchingSnapshot(snapshotMetadataArr[1]);
    }

    @Test
    public void shouldReplicationSnapshot() throws Exception {
        SnapshotMetadata[] snapshotMetadataArr = {createSnapshot(TypedStreamProcessorTest.STREAM_NAME, 3L, TypedStreamProcessorTest.STREAM_NAME), createSnapshot(TestJarExporter.FOO, 6L, TestJarExporter.FOO)};
        installService();
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
        this.output.getLastRequest().respondWith((BufferWriter) generateListSnapshotsResponse(snapshotMetadataArr));
        this.actorSchedulerRule.workUntilDone();
        assertFetchingSnapshot(snapshotMetadataArr[0]);
        this.output.getLastRequest().respondWith((BufferWriter) generateFetchSnapshotChunkResponse(TypedStreamProcessorTest.STREAM_NAME, (FetchSnapshotChunkRequest) this.output.getLastRequest().getRequest()));
        this.actorSchedulerRule.workUntilDone();
        assertReplicated(snapshotMetadataArr[0], TypedStreamProcessorTest.STREAM_NAME);
        assertFetchingSnapshot(snapshotMetadataArr[1]);
        this.output.getLastRequest().respondWith((BufferWriter) generateFetchSnapshotChunkResponse(TestJarExporter.FOO, (FetchSnapshotChunkRequest) this.output.getLastRequest().getRequest()));
        this.actorSchedulerRule.workUntilDone();
        assertReplicated(snapshotMetadataArr[1], TestJarExporter.FOO);
        this.actorSchedulerRule.waitForTimer(this.snapshotPollInterval);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
    }

    @Test
    public void shouldPollSnapshotsOnAbortIfNoMoreToReplicate() throws Exception {
        BufferWriter data = new ErrorResponse().setCode(ErrorResponseCode.READ_ERROR).setData("could not read");
        SnapshotMetadata[] snapshotMetadataArr = {createSnapshot(TypedStreamProcessorTest.STREAM_NAME, 3L, TypedStreamProcessorTest.STREAM_NAME)};
        installService();
        this.output.getLastRequest().respondWith((BufferWriter) generateListSnapshotsResponse(snapshotMetadataArr));
        this.actorSchedulerRule.workUntilDone();
        assertFetchingSnapshot(snapshotMetadataArr[0]);
        this.output.getLastRequest().respondWith(data);
        this.actorSchedulerRule.workUntilDone();
        this.actorSchedulerRule.waitForTimer(this.snapshotPollInterval);
        Assertions.assertThat(this.output.getLastRequest().getTemplateId()).isEqualTo(6);
    }

    @Test
    public void shouldNotRemoveLastReplicatedSnapshotWhenClosing() throws Exception {
        ServiceName snapshotReplicationServiceName = ClusterBaseLayerServiceNames.snapshotReplicationServiceName(this.partition);
        SnapshotMetadata[] snapshotMetadataArr = {createSnapshot(TypedStreamProcessorTest.STREAM_NAME, 3L, TypedStreamProcessorTest.STREAM_NAME)};
        installService();
        this.output.getLastRequest().respondWith((BufferWriter) generateListSnapshotsResponse(snapshotMetadataArr));
        this.actorSchedulerRule.workUntilDone();
        this.output.getLastRequest().respondWith((BufferWriter) generateFetchSnapshotChunkResponse(TypedStreamProcessorTest.STREAM_NAME, (FetchSnapshotChunkRequest) this.output.getLastRequest().getRequest()));
        this.actorSchedulerRule.workUntilDone();
        assertReplicated(snapshotMetadataArr[0], TypedStreamProcessorTest.STREAM_NAME);
        this.serviceContainerRule.get().removeService(snapshotReplicationServiceName);
        this.actorSchedulerRule.workUntilDone();
        Assertions.assertThat(this.serviceContainerRule.get().hasService(snapshotReplicationServiceName)).isFalse();
        assertReplicated(snapshotMetadataArr[0], TypedStreamProcessorTest.STREAM_NAME);
    }

    private void installService() {
        this.serviceContainerRule.get().createService(ClusterBaseLayerServiceNames.snapshotReplicationServiceName(this.partition), this.service).install();
        this.actorSchedulerRule.workUntilDone();
    }

    private void assertReplicated(SnapshotMetadata snapshotMetadata, String str) throws Exception {
        FsReadableSnapshot lastSnapshot = this.storage.getLastSnapshot(snapshotMetadata.getName());
        byte[] bytes = StringUtil.getBytes(str);
        byte[] bArr = new byte[bytes.length];
        Assertions.assertThat(lastSnapshot).isNotNull();
        lastSnapshot.getData().read(bArr, 0, bytes.length);
        Assertions.assertThat(lastSnapshot.getPosition()).isEqualTo(snapshotMetadata.getPosition());
        Assertions.assertThat(lastSnapshot.getChecksum()).isEqualTo(snapshotMetadata.getChecksum());
        Assertions.assertThat(lastSnapshot.getSize()).isEqualTo(snapshotMetadata.getSize());
        Assertions.assertThat(bArr).isEqualTo(bytes);
    }

    private void assertFetchingSnapshot(SnapshotMetadata snapshotMetadata) {
        Assertions.assertThat(this.output.getLastRequest().getRequest()).isInstanceOf(FetchSnapshotChunkRequest.class);
        FetchSnapshotChunkRequest request = this.output.getLastRequest().getRequest();
        Assertions.assertThat(request.getChunkOffset()).isEqualTo(0L);
        Assertions.assertThat(request.getPartitionId()).isEqualTo(this.partition.getInfo().getPartitionId());
        Assertions.assertThat(BufferUtil.bufferAsString(request.getName())).isEqualTo(snapshotMetadata.getName());
        Assertions.assertThat(request.getLogPosition()).isEqualTo(snapshotMetadata.getPosition());
    }

    private ClientTransport createTransport() {
        ClientTransport clientTransport = (ClientTransport) Mockito.mock(ClientTransport.class);
        ((ClientTransport) Mockito.doAnswer(invocationOnMock -> {
            return this.output;
        }).when(clientTransport)).getOutput();
        return clientTransport;
    }

    private FetchSnapshotChunkResponse generateFetchSnapshotChunkResponse(String str, FetchSnapshotChunkRequest fetchSnapshotChunkRequest) {
        byte[] bytes = StringUtil.getBytes(str);
        return new FetchSnapshotChunkResponse().setData(bytes, (int) fetchSnapshotChunkRequest.getChunkOffset(), Math.min(fetchSnapshotChunkRequest.getChunkLength(), bytes.length));
    }

    private ListSnapshotsResponse generateListSnapshotsResponse(SnapshotMetadata[] snapshotMetadataArr) {
        ListSnapshotsResponse listSnapshotsResponse = new ListSnapshotsResponse();
        for (SnapshotMetadata snapshotMetadata : snapshotMetadataArr) {
            listSnapshotsResponse.addSnapshot(snapshotMetadata.getName(), snapshotMetadata.getPosition(), snapshotMetadata.getChecksum(), snapshotMetadata.getSize());
        }
        return listSnapshotsResponse;
    }

    private SnapshotMetadata createSnapshot(String str, long j, String str2) throws Exception {
        return new FsSnapshotMetadata(str, j, r0.length, true, MessageDigest.getInstance(this.config.getChecksumAlgorithm()).digest(StringUtil.getBytes(str2)));
    }

    private NodeInfo createLeaderNodeInfo(int i) {
        return createNodeInfo(i, "0.0.0.0", ConfigurationTest.CLIENT_PORT);
    }

    private NodeInfo createNodeInfo(int i, String str, int i2) {
        return new NodeInfo(i, new SocketAddress(str, i2), new SocketAddress(str, i2 + 1), new SocketAddress(str, i2 + 2), new SocketAddress(str, i2 + 3));
    }

    private FsSnapshotStorage createSnapshotStorage() {
        this.config.setRootPath(this.tempFolder.getRoot().getAbsolutePath());
        return new FsSnapshotStorage(this.config);
    }

    private Partition createPartition() {
        return new Partition(new PartitionInfo(1, 1), RaftState.FOLLOWER) { // from class: io.zeebe.broker.clustering.base.snapshots.SnapshotReplicationServiceTest.1
            public SnapshotStorage getSnapshotStorage() {
                return SnapshotReplicationServiceTest.this.storage;
            }
        };
    }
}
