package io.zeebe.broker.system.partitions.impl;

import io.atomix.raft.zeebe.ZeebeEntry;
import io.atomix.storage.journal.Indexed;
import io.zeebe.broker.system.partitions.SnapshotReplication;
import io.zeebe.db.impl.DefaultColumnFamily;
import io.zeebe.db.impl.rocksdb.ZeebeRocksDbFactory;
import io.zeebe.logstreams.util.RocksDBWrapper;
import io.zeebe.snapshots.broker.ConstructableSnapshotStore;
import io.zeebe.snapshots.broker.impl.FileBasedSnapshotStoreFactory;
import io.zeebe.snapshots.raft.PersistedSnapshot;
import io.zeebe.snapshots.raft.ReceivableSnapshotStore;
import io.zeebe.snapshots.raft.SnapshotChunk;
import io.zeebe.snapshots.raft.TransientSnapshot;
import io.zeebe.test.util.AutoCloseableRule;
import io.zeebe.util.sched.testing.ActorSchedulerRule;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import java.util.zip.CRC32;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:io/zeebe/broker/system/partitions/impl/ReplicateStateControllerTest.class */
public final class ReplicateStateControllerTest {
    private static final int VALUE = 51966;
    private static final String KEY = "test";

    @Rule
    public final TemporaryFolder tempFolderRule = new TemporaryFolder();

    @Rule
    public final AutoCloseableRule autoCloseableRule = new AutoCloseableRule();

    @Rule
    public final ActorSchedulerRule actorSchedulerRule = new ActorSchedulerRule();
    private StateControllerImpl replicatorSnapshotController;
    private StateControllerImpl receiverSnapshotController;
    private Replicator replicator;
    private ConstructableSnapshotStore senderStore;
    private ReceivableSnapshotStore receiverStore;

    /* loaded from: input_file:io/zeebe/broker/system/partitions/impl/ReplicateStateControllerTest$Replicator.class */
    protected static final class Replicator implements SnapshotReplication {
        private Consumer<SnapshotChunk> chunkConsumer;
        final List<SnapshotChunk> replicatedChunks = new ArrayList();
        private final ExecutorService executorService = Executors.newSingleThreadExecutor();

        protected Replicator() {
        }

        public void replicate(SnapshotChunk snapshotChunk) {
            this.replicatedChunks.add(snapshotChunk);
            if (this.chunkConsumer != null) {
                this.executorService.execute(() -> {
                    this.chunkConsumer.accept(snapshotChunk);
                });
            }
        }

        public void consume(Consumer<SnapshotChunk> consumer) {
            this.chunkConsumer = consumer;
        }

        public void close() {
        }
    }

    @Before
    public void setup() throws IOException {
        Path path = this.tempFolderRule.newFolder("sender").toPath();
        FileBasedSnapshotStoreFactory fileBasedSnapshotStoreFactory = new FileBasedSnapshotStoreFactory(this.actorSchedulerRule.get());
        fileBasedSnapshotStoreFactory.createReceivableSnapshotStore(path, "1");
        this.senderStore = fileBasedSnapshotStoreFactory.getConstructableSnapshotStore("1");
        Path path2 = this.tempFolderRule.newFolder("receiver").toPath();
        FileBasedSnapshotStoreFactory fileBasedSnapshotStoreFactory2 = new FileBasedSnapshotStoreFactory(this.actorSchedulerRule.get());
        this.receiverStore = fileBasedSnapshotStoreFactory2.createReceivableSnapshotStore(path2, "1");
        this.replicator = new Replicator();
        this.replicatorSnapshotController = new StateControllerImpl(1, ZeebeRocksDbFactory.newFactory(DefaultColumnFamily.class), this.senderStore, fileBasedSnapshotStoreFactory.getReceivableSnapshotStore("1"), path.resolve("runtime"), this.replicator, j -> {
            return Optional.of(new Indexed(j, new ZeebeEntry(1L, System.currentTimeMillis(), 1L, 10L, (ByteBuffer) null), 0));
        }, zeebeDb -> {
            return Long.MAX_VALUE;
        });
        this.senderStore.addSnapshotListener(this.replicatorSnapshotController);
        this.receiverSnapshotController = new StateControllerImpl(1, ZeebeRocksDbFactory.newFactory(DefaultColumnFamily.class), fileBasedSnapshotStoreFactory2.getConstructableSnapshotStore("1"), this.receiverStore, path2.resolve("runtime"), this.replicator, j2 -> {
            return Optional.of(new Indexed(j2, new ZeebeEntry(1L, System.currentTimeMillis(), 1L, 10L, (ByteBuffer) null), 0));
        }, zeebeDb2 -> {
            return Long.MAX_VALUE;
        });
        this.receiverStore.addSnapshotListener(this.receiverSnapshotController);
        this.autoCloseableRule.manage(this.replicatorSnapshotController);
        this.autoCloseableRule.manage(this.receiverSnapshotController);
        this.autoCloseableRule.manage(this.senderStore);
        this.autoCloseableRule.manage(this.receiverStore);
        RocksDBWrapper rocksDBWrapper = new RocksDBWrapper();
        rocksDBWrapper.wrap(this.replicatorSnapshotController.openDb());
        rocksDBWrapper.putInt("test", VALUE);
    }

    @Test
    public void shouldReplicateSnapshotChunks() {
        ((TransientSnapshot) this.replicatorSnapshotController.takeTransientSnapshot(1L).orElseThrow()).persist().join();
        List<SnapshotChunk> list = this.replicator.replicatedChunks;
        int size = list.size();
        Assertions.assertThat(size).isGreaterThan(0);
        Assertions.assertThat(size).isEqualTo(list.get(0).getTotalCount());
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getTotalCount();
        }).containsOnly(new Integer[]{Integer.valueOf(size)});
        Assertions.assertThat(list).extracting((v0) -> {
            return v0.getSnapshotId();
        }).flatExtracting(str -> {
            return List.of((Object[]) str.split("-"));
        }).contains(new String[]{"1"});
    }

    @Test
    public void shouldContainChecksumPerChunk() {
        ((TransientSnapshot) this.replicatorSnapshotController.takeTransientSnapshot(1L).orElseThrow()).persist().join();
        List<SnapshotChunk> list = this.replicator.replicatedChunks;
        Assertions.assertThat(list.size()).isGreaterThan(0);
        list.forEach(snapshotChunk -> {
            CRC32 crc32 = new CRC32();
            crc32.update(snapshotChunk.getContent());
            Assertions.assertThat(snapshotChunk.getChecksum()).isEqualTo(crc32.getValue());
        });
    }

    @Test
    public void shouldReceiveSnapshotChunks() throws Exception {
        this.receiverSnapshotController.consumeReplicatedSnapshots();
        TransientSnapshot transientSnapshot = (TransientSnapshot) this.replicatorSnapshotController.takeTransientSnapshot(1L).orElseThrow();
        CompletableFuture completableFuture = new CompletableFuture();
        transientSnapshot.onSnapshotTaken((bool, th) -> {
            completableFuture.complete(null);
        });
        completableFuture.join();
        transientSnapshot.persist().join();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.receiverStore.hasSnapshotId(transientSnapshot.snapshotId().getSnapshotIdAsString()));
        });
        RocksDBWrapper rocksDBWrapper = new RocksDBWrapper();
        this.receiverSnapshotController.recover();
        Assertions.assertThat(this.receiverSnapshotController.isDbOpened()).isTrue();
        rocksDBWrapper.wrap(this.receiverSnapshotController.openDb());
        Assertions.assertThat(rocksDBWrapper.getInt("test")).isEqualTo(VALUE);
    }

    @Test
    public void shouldNotFailOnReplicatingAndReceivingTwice() throws Exception {
        this.receiverSnapshotController.consumeReplicatedSnapshots();
        TransientSnapshot transientSnapshot = (TransientSnapshot) this.replicatorSnapshotController.takeTransientSnapshot(1L).orElseThrow();
        CompletableFuture completableFuture = new CompletableFuture();
        transientSnapshot.onSnapshotTaken((bool, th) -> {
            completableFuture.complete(null);
        });
        completableFuture.join();
        PersistedSnapshot persistedSnapshot = (PersistedSnapshot) transientSnapshot.persist().join();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.receiverStore.hasSnapshotId(transientSnapshot.snapshotId().getSnapshotIdAsString()));
        });
        this.receiverSnapshotController.onNewSnapshot(persistedSnapshot);
        RocksDBWrapper rocksDBWrapper = new RocksDBWrapper();
        this.receiverSnapshotController.recover();
        Assertions.assertThat(this.receiverSnapshotController.isDbOpened()).isTrue();
        rocksDBWrapper.wrap(this.receiverSnapshotController.openDb());
        Assertions.assertThat(rocksDBWrapper.getInt("test")).isEqualTo(VALUE);
    }
}
