/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.schema.DefaultRegexTopicNamingStrategy;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MySqlTopicNamingStrategyIT
extends AbstractConnectorTest {
    private static final String TABLE_NAME = "dbz4180";
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-comment.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("topic_strategy", "strategy_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    @Test
    @FixFor(value={"DBZ-4180"})
    public void testSpecifyDelimiterAndPrefixStrategy() throws SQLException, InterruptedException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName(TABLE_NAME))).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "true")).with(AbstractTopicNamingStrategy.TOPIC_PREFIX, "my_prefix")).with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_")).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(100);
        String expectedDataTopic = String.join((CharSequence)"_", "my_prefix", this.DATABASE.getDatabaseName(), TABLE_NAME);
        List dataChangeEvents = records.recordsForTopic(expectedDataTopic);
        Assertions.assertThat((int)dataChangeEvents.size()).isEqualTo(1);
        String expectedSchemaTopic = "my_prefix";
        List schemaChangeEvents = records.recordsForTopic(expectedSchemaTopic);
        Assertions.assertThat((int)schemaChangeEvents.size()).isEqualTo(10);
        try (Connection conn = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).connection();){
            conn.createStatement().execute("INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'strategy 1', 1290)");
        }
        dataChangeEvents = this.consumeRecordsByTopic(1).recordsForTopic(expectedDataTopic);
        Assertions.assertThat((int)dataChangeEvents.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)dataChangeEvents.get(0);
        Struct change = ((Struct)sourceRecord.value()).getStruct("after");
        Assertions.assertThat((String)change.getString("c")).isEqualTo((Object)"strategy 1");
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-4180"})
    public void testSpecifyByLogicalTableStrategy() throws SQLException, InterruptedException {
        String tables = this.DATABASE.qualifiedTableName("dbz_4180_00") + "," + this.DATABASE.qualifiedTableName("dbz_4180_01");
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")).with(DefaultRegexTopicNamingStrategy.TOPIC_REGEX, "(.*)(dbz_4180)(.*)")).with(DefaultRegexTopicNamingStrategy.TOPIC_REPLACEMENT, "$1$2_all_shards")).with(MySqlConnectorConfig.TOPIC_NAMING_STRATEGY, "io.debezium.schema.DefaultRegexTopicNamingStrategy")).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        MySqlTopicNamingStrategyIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection conn = db.connect();){
            String shard0 = "INSERT INTO dbz_4180_00(a, b, c, d) VALUE (10.1, 10.2, 'shard 0', 10);";
            String shard1 = "INSERT INTO dbz_4180_01(a, b, c, d) VALUE (10.1, 10.2, 'shard 1', 11);";
            conn.execute(new String[]{shard0, shard1});
        }
        String expectedTopic = this.DATABASE.topicForTable("dbz_4180_all_shards");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(100);
        List records = sourceRecords.recordsForTopic(expectedTopic);
        Assert.assertEquals((long)2L, (long)records.size());
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-4180"})
    public void testSpecifyTransactionStrategy() throws SQLException, InterruptedException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName(TABLE_NAME))).with(RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_CHANGES, "false")).with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, "true")).with(AbstractTopicNamingStrategy.TOPIC_TRANSACTION, "my_transaction")).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        MySqlTopicNamingStrategyIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection conn = db.connect();){
            conn.setAutoCommit(false);
            conn.execute(new String[]{"INSERT INTO dbz4180(a, b, c, d) VALUE (10.1, 10.2, 'test transaction', 1290)"});
            conn.commit();
        }
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(100);
        String expectedTransactionTopic = this.DATABASE.getServerName() + ".my_transaction";
        List transactionRecords = sourceRecords.recordsForTopic(expectedTransactionTopic);
        Assert.assertEquals((long)2L, (long)transactionRecords.size());
        List records = sourceRecords.allRecordsInOrder();
        Assert.assertEquals((long)3L, (long)records.size());
        this.stopConnector();
    }
}

