/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.actions.snapshotting.ExecuteSnapshot;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class SignalsIT
extends AbstractAsyncEngineConnectorTest {
    protected static final String SERVER_NAME = "is_test";
    protected static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-schema-history-is.txt").toAbsolutePath();
    protected final UniqueDatabase DATABASE = new UniqueDatabase("is_test", "incremental_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    protected static KafkaCluster kafka;

    @BeforeClass
    public static void startKafka() throws Exception {
        File dataDir = Testing.Files.createTestingDirectory((String)"signal_cluster");
        Testing.Files.delete((File)dataDir);
        kafka = new KafkaCluster().usingDirectory(dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf((String)"auto.create.topics.enable", (String)"true", (String)"zookeeper.session.timeout.ms", (String)"20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Before
    public void before() throws SQLException {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
    }

    @After
    public void after() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
    }

    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.USER, "mysqluser")).with(MySqlConnectorConfig.PASSWORD, "mysqlpw")).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.NO_DATA.getValue())).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal"))).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 1)).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)).with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, (EnumeratedValue)CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
    }

    @Test
    public void givenOffsetCommitDisabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        String signalTopic = "signals_topic-1";
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-1")).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        this.sendExecuteSnapshotKafkaSignal("b", "signals_topic-1");
        this.waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Thread.sleep(5000L);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(2);
        Assertions.assertThat((boolean)logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    @Test
    public void givenOffsetCommitDisabledAndASignalSentWithConnectorDown_whenConnectorComesBackUp_thenNoSignalsProcessed() throws ExecutionException, InterruptedException {
        String signalTopic = "signals_topic-2";
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        this.sendExecuteSnapshotKafkaSignal("b", "signals_topic-2");
        Thread.sleep(5000L);
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-2")).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(0);
        Assertions.assertThat((long)logInterceptor.countOccurrences("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isEqualTo(0L);
    }

    @Test
    public void givenOffsetCommitEnabledAndASignalSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        String signalTopic = "signals_topic-3";
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-3")).with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        this.sendExecuteSnapshotKafkaSignal("b", "signals_topic-3");
        this.waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((boolean)logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    @Test
    public void givenOffsetCommitEnabledAndMultipleSignalsSentWithConnectorRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        String signalTopic = "signals_topic-4";
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        this.sendExecuteSnapshotKafkaSignal("b", "signals_topic-4");
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-4")).with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        this.stopConnector();
        this.sendExecuteSnapshotKafkaSignal("c", "signals_topic-4");
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-4")).with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        SignalsIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName());
        this.waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        Assertions.assertThat((List)logInterceptor.getLogEntriesThatContainsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).hasSize(1);
        Assertions.assertThat((List)logInterceptor.getLogEntriesThatContainsMessage("Requested 'INCREMENTAL' snapshot of data collections '[c]'")).hasSize(1);
    }

    @Test
    public void givenOffsetCommitEnabledAndASignalSentWithConnectorNotRunning_whenConnectorComesBackUp_thenAllSignalsAreCorrectlyProcessed() throws ExecutionException, InterruptedException {
        String signalTopic = "signals_topic-5";
        LogInterceptor logInterceptor = new LogInterceptor(ExecuteSnapshot.class);
        this.sendExecuteSnapshotKafkaSignal("b", "signals_topic-5");
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka")).with(KafkaSignalChannel.SIGNAL_TOPIC, "signals_topic-5")).with(KafkaSignalChannel.SIGNAL_CONSUMER_OFFSET_COMMIT_ENABLED, true)).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList()));
        this.assertConnectorIsRunning();
        Assertions.assertThat((boolean)logInterceptor.containsMessage("Requested 'INCREMENTAL' snapshot of data collections '[b]'")).isTrue();
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        this.startConnector(custConfig, this.loggingCompletion());
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(MySqlConnector.class, config, callback);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
    }

    protected void sendExecuteSnapshotKafkaSignal(String table, String signalTopic) throws ExecutionException, InterruptedException {
        String signalValue = String.format("{\"type\":\"execute-snapshot\",\"data\": {\"data-collections\": [\"%s\"], \"type\": \"INCREMENTAL\"}}", table);
        this.sendKafkaSignal(signalValue, signalTopic);
    }

    protected void sendKafkaSignal(String signalValue, String signalTopic) throws ExecutionException, InterruptedException {
        ProducerRecord executeSnapshotSignal = new ProducerRecord(signalTopic, Integer.valueOf(0), (Object)SERVER_NAME, (Object)signalValue);
        Configuration signalProducerConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "signals").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(signalProducerConfig.asProperties());){
            producer.send(executeSnapshotSignal).get();
        }
    }
}

