/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.kafka;

import io.debezium.junit.SkipLongRunning;
import io.debezium.junit.SkipTestRule;
import io.debezium.kafka.KafkaCluster;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public class KafkaClusterTest {
    @Rule
    public TestRule skipTestRule = new SkipTestRule();
    private KafkaCluster cluster;
    private File dataDir;

    @Before
    public void beforeEach() {
        this.dataDir = Testing.Files.createTestingDirectory("cluster");
        Testing.Files.delete(this.dataDir);
        this.cluster = new KafkaCluster().usingDirectory(this.dataDir);
    }

    @After
    public void afterEach() {
        this.cluster.shutdown();
        Testing.Files.delete(this.dataDir);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithOneBrokerAndRemoveData() throws Exception {
        this.cluster.deleteDataUponShutdown(true).addBrokers(1).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertDoesNotExist);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithMultipleBrokerAndRemoveData() throws Exception {
        this.cluster.deleteDataUponShutdown(true).addBrokers(3).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertDoesNotExist);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithOneBrokerAndLeaveData() throws Exception {
        this.cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterWithMultipleBrokerAndLeaveData() throws Exception {
        this.cluster.deleteDataUponShutdown(false).addBrokers(3).startup();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
        this.cluster.shutdown();
        this.cluster.onEachDirectory(this::assertValidDataDirectory);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterAndAllowProducersAndConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        String topicName = "topicA";
        CountDownLatch completion = new CountDownLatch(2);
        int numMessages = 100;
        AtomicLong messagesRead = new AtomicLong(0L);
        this.cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch sw = Stopwatch.reusable().start();
        this.cluster.useTo().consumeIntegers("topicA", 100, 10L, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
            messagesRead.incrementAndGet();
            return true;
        });
        this.cluster.useTo().produceIntegers("topicA", 100, 1, completion::countDown);
        if (completion.await(10L, TimeUnit.SECONDS)) {
            sw.stop();
            Testing.debug("Both consumer and producer completed normally in " + sw.durations());
        } else {
            Testing.debug("Consumer and/or producer did not completed normally");
        }
        Assertions.assertThat((long)messagesRead.get()).isEqualTo(100L);
    }

    @Test
    public void shouldStartClusterAndAllowInteractiveProductionAndAutomaticConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        String topicName = "topicA";
        CountDownLatch completion = new CountDownLatch(1);
        int numMessages = 3;
        AtomicLong messagesRead = new AtomicLong(0L);
        this.cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch sw = Stopwatch.reusable().start();
        this.cluster.useTo().consumeIntegers("topicA", 3, 10L, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
            messagesRead.incrementAndGet();
            return true;
        });
        this.cluster.useTo().createProducer("manual", new StringSerializer(), new IntegerSerializer()).write("topicA", "key1", 1).write("topicA", "key2", 2).write("topicA", "key3", 3).close();
        if (completion.await(10L, TimeUnit.SECONDS)) {
            sw.stop();
            Testing.debug("The consumer completed normally in " + sw.durations());
        } else {
            Testing.debug("Consumer did not completed normally");
        }
        Assertions.assertThat((long)messagesRead.get()).isEqualTo(3L);
    }

    @Test
    @SkipLongRunning
    public void shouldStartClusterAndAllowAsynchronousProductionAndAutomaticConsumersToUseIt() throws Exception {
        Testing.Debug.enable();
        String topicName = "topicA";
        CountDownLatch completion = new CountDownLatch(2);
        int numMessages = 3;
        AtomicLong messagesRead = new AtomicLong(0L);
        this.cluster.deleteDataUponShutdown(false).addBrokers(1).startup();
        this.cluster.createTopics("topicA");
        Stopwatch sw = Stopwatch.reusable().start();
        this.cluster.useTo().consumeIntegers("topicA", 3, 10L, TimeUnit.SECONDS, completion::countDown, (key, value) -> {
            messagesRead.incrementAndGet();
            return true;
        });
        this.cluster.useTo().produce("manual", new StringSerializer(), new IntegerSerializer(), produer -> {
            produer.write("topicA", "key1", 1);
            produer.write("topicA", "key2", 2);
            produer.write("topicA", "key3", 3);
            completion.countDown();
        });
        if (completion.await(10L, TimeUnit.SECONDS)) {
            sw.stop();
            Testing.debug("The consumer completed normally in " + sw.durations());
        } else {
            Testing.debug("Consumer did not completed normally");
        }
        Assertions.assertThat((long)messagesRead.get()).isEqualTo(3L);
    }

    protected void assertValidDataDirectory(File dir) {
        Assertions.assertThat((boolean)dir.exists()).isTrue();
        Assertions.assertThat((boolean)dir.isDirectory()).isTrue();
        Assertions.assertThat((boolean)dir.canWrite()).isTrue();
        Assertions.assertThat((boolean)dir.canRead()).isTrue();
        Assertions.assertThat((boolean)Testing.Files.inTargetDir(dir)).isTrue();
    }

    protected void assertDoesNotExist(File dir) {
        Assertions.assertThat((boolean)dir.exists()).isFalse();
    }
}

