/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.jdbc.history;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.storage.jdbc.history.JdbcSchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import io.debezium.util.Testing;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

public class JdbcSchemaHistoryIT
extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"schema-history.db").toAbsolutePath();
    private static final String USER = "debezium";
    private static final String PASSWORD = "dbz";
    private static final String PRIVILEGED_USER = "mysqluser";
    private static final String PRIVILEGED_PASSWORD = "mysqlpassword";
    private static final String ROOT_PASSWORD = "debezium";
    private static final String DBNAME = "inventory";
    private static final String IMAGE = "debezium/example-mysql";
    private static final Integer PORT = 3306;
    private static final String TOPIC_PREFIX = "test";
    private static final String TABLE_NAME = "schematest";
    private static final GenericContainer<?> container = new GenericContainer("debezium/example-mysql").waitingFor((WaitStrategy)Wait.forLogMessage((String)".*mysqld: ready for connections.*", (int)2)).withEnv("MYSQL_ROOT_PASSWORD", "debezium").withEnv("MYSQL_USER", "mysqluser").withEnv("MYSQL_PASSWORD", "mysqlpassword").withExposedPorts(new Integer[]{PORT}).withStartupTimeout(Duration.ofSeconds(180L));

    @BeforeClass
    public static void startDatabase() {
        container.start();
    }

    @AfterClass
    public static void stopDatabase() {
        container.stop();
    }

    @Before
    public void beforeEach() throws SQLException {
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        try (JdbcConnection conn = this.testConnection();){
            conn.execute(new String[]{"DROP TABLE IF EXISTS schematest", "CREATE TABLE schematest (id INT PRIMARY KEY, val VARCHAR(16))", "INSERT INTO schematest VALUES (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four')"});
        }
        this.stopConnector();
    }

    @After
    public void afterEach() throws SQLException {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
        try (JdbcConnection conn = this.testConnection();){
            conn.execute(new String[]{"DROP TABLE IF EXISTS schematest"});
        }
    }

    private String topicName() {
        return String.format("%s.%s.%s", TOPIC_PREFIX, DBNAME, TABLE_NAME);
    }

    protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
        return builder.with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_JDBC_URL.name(), "jdbc:sqlite:" + SCHEMA_HISTORY_PATH).with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_USER.name(), "user").with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_PASSWORD.name(), "pass");
    }

    private Configuration.Builder config() throws IOException {
        File dbFile = File.createTempFile("test-", "db");
        String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
        Configuration.Builder builder = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, container.getHost())).with(MySqlConnectorConfig.PORT, (Object)container.getMappedPort(PORT.intValue()))).with(MySqlConnectorConfig.USER, "debezium")).with(MySqlConnectorConfig.PASSWORD, PASSWORD)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DBNAME)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "inventory.schematest")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.SCHEMA_HISTORY, JdbcSchemaHistory.class)).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_JDBC_URL.name(), jdbcUrl).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_USER.name(), "user").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_PASSWORD.name(), "pass").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name(), "offsets_jdbc");
        return this.schemaHistory(builder);
    }

    private JdbcConnection testConnection() {
        JdbcConfiguration jdbcConfig = (JdbcConfiguration)JdbcConfiguration.create().withHostname(container.getHost()).withPort(container.getMappedPort(PORT.intValue()).intValue()).withUser(PRIVILEGED_USER).withPassword(PRIVILEGED_PASSWORD).withDatabase(DBNAME).build();
        String url = "jdbc:mysql://${hostname}:${port}/${dbname}";
        return new JdbcConnection(jdbcConfig, JdbcConnection.patternBasedFactory((String)"jdbc:mysql://${hostname}:${port}/${dbname}", (Field[])new Field[0]), "`", "`");
    }

    @Test
    public void shouldStreamChanges() throws InterruptedException, IOException {
        Configuration config = this.config().build();
        this.start(MySqlConnector.class, config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(4);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((List)records.recordsForTopic(this.topicName())).hasSize(4);
        this.stopConnector();
    }

    @Test
    public void shouldStreamChangesAfterRestart() throws InterruptedException, SQLException, IOException {
        Configuration config = this.config().build();
        this.start(MySqlConnector.class, config);
        JdbcSchemaHistoryIT.waitForStreamingRunning((String)"mysql", (String)TOPIC_PREFIX);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(4);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((List)records.recordsForTopic(this.topicName())).hasSize(4);
        try (BufferedReader br = new BufferedReader(new FileReader(String.valueOf(OFFSET_STORE_PATH)));){
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println(line);
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        this.stopConnector();
        try (JdbcConnection conn = this.testConnection();){
            conn.execute(new String[]{"INSERT INTO schematest VALUES (5, 'five')"});
        }
        this.start(MySqlConnector.class, config);
        JdbcSchemaHistoryIT.waitForStreamingRunning((String)"mysql", (String)TOPIC_PREFIX);
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((List)records.recordsForTopic(this.topicName())).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic(this.topicName()).get(0);
        Assertions.assertThat((Object)((Struct)record.key()).get("id")).isEqualTo((Object)5);
        this.stopConnector();
    }
}

