/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.storage.jdbc;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.storage.jdbc.history.JdbcSchemaHistory;
import io.debezium.storage.jdbc.history.JdbcSchemaHistoryConfig;
import io.debezium.storage.jdbc.offset.JdbcOffsetBackingStoreConfig;
import io.debezium.util.Testing;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class JdbcOffsetBackingStoreIT
extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"schema-history.db").toAbsolutePath();
    private static final String USER = "debezium";
    private static final String PASSWORD = "dbz";
    private static final String PRIVILEGED_USER = "mysqluser";
    private static final String PRIVILEGED_PASSWORD = "mysqlpassword";
    private static final String ROOT_PASSWORD = "debezium";
    private static final String DBNAME = "inventory";
    private static final String IMAGE = "debezium/example-mysql";
    private static final Integer PORT = 3306;
    private static final String TOPIC_PREFIX = "test";
    private static final String TABLE_NAME = "schematest";
    private static final GenericContainer<?> container = new GenericContainer("debezium/example-mysql").waitingFor((WaitStrategy)Wait.forLogMessage((String)".*mysqld: ready for connections.*", (int)2)).withEnv("MYSQL_ROOT_PASSWORD", "debezium").withEnv("MYSQL_USER", "mysqluser").withEnv("MYSQL_PASSWORD", "mysqlpassword").withExposedPorts(new Integer[]{PORT}).withStartupTimeout(Duration.ofSeconds(180L));

    @BeforeClass
    public static void startDatabase() {
        container.start();
    }

    @AfterClass
    public static void stopDatabase() {
        container.stop();
    }

    @Before
    public void beforeEach() throws SQLException {
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        try (JdbcConnection conn = this.testConnection();){
            conn.execute(new String[]{"DROP TABLE IF EXISTS schematest", "CREATE TABLE schematest (id INT PRIMARY KEY, val VARCHAR(16))", "INSERT INTO schematest VALUES (1, 'one'), (2, 'two'), (3, 'three'), (4, 'four')"});
        }
        this.stopConnector();
    }

    @After
    public void afterEach() throws SQLException {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
        try (JdbcConnection conn = this.testConnection();){
            conn.execute(new String[]{"DROP TABLE IF EXISTS schematest"});
        }
    }

    protected Configuration.Builder schemaHistory(Configuration.Builder builder) {
        return builder.with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_JDBC_URL.name(), "jdbc:sqlite:" + SCHEMA_HISTORY_PATH).with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_USER.name(), "user").with("schema.history.internal." + JdbcSchemaHistoryConfig.PROP_PASSWORD.name(), "pass");
    }

    private Configuration.Builder config(String jdbcUrl) {
        Configuration.Builder builder = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, container.getHost())).with(MySqlConnectorConfig.PORT, (Object)container.getMappedPort(PORT.intValue()))).with(MySqlConnectorConfig.USER, "debezium")).with(MySqlConnectorConfig.PASSWORD, PASSWORD)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, DBNAME)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "inventory.schematest")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.SCHEMA_HISTORY, JdbcSchemaHistory.class)).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)BinlogConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_JDBC_URL.name(), jdbcUrl).with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_USER.name(), "user").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_PASSWORD.name(), "pass").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_NAME.name(), "offsets_jdbc").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_DDL.name(), "CREATE TABLE %s(id VARCHAR(36) NOT NULL, offset_key VARCHAR(1255), offset_val VARCHAR(1255),record_insert_ts TIMESTAMP NOT NULL,record_insert_seq INTEGER NOT NULL)").with("offset.storage." + JdbcOffsetBackingStoreConfig.PROP_TABLE_SELECT.name(), "SELECT id, offset_key, offset_val FROM %s ORDER BY record_insert_ts, record_insert_seq").with("offset.flush.interval.ms", "1000").with("offset.storage", "io.debezium.storage.jdbc.offset.JdbcOffsetBackingStore");
        return this.schemaHistory(builder);
    }

    private JdbcConnection testConnection() {
        JdbcConfiguration jdbcConfig = (JdbcConfiguration)JdbcConfiguration.create().withHostname(container.getHost()).withPort(container.getMappedPort(PORT.intValue()).intValue()).withUser(PRIVILEGED_USER).withPassword(PRIVILEGED_PASSWORD).withDatabase(DBNAME).build();
        String url = "jdbc:mysql://${hostname}:${port}/${dbname}";
        return new JdbcConnection(jdbcConfig, JdbcConnection.patternBasedFactory((String)"jdbc:mysql://${hostname}:${port}/${dbname}", (Field[])new Field[0]), "`", "`");
    }

    @Test
    public void shouldStartCorrectlyWithJdbcOffsetStorage() throws InterruptedException, IOException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        File dbFile = File.createTempFile("test-", "db");
        String jdbcUrl = String.format("jdbc:sqlite:%s", dbFile.getAbsolutePath());
        Configuration config = this.config(jdbcUrl).build();
        this.start(MySqlConnector.class, config);
        JdbcOffsetBackingStoreIT.waitForStreamingRunning((String)"mysql", (String)TOPIC_PREFIX);
        this.consumeRecordsByTopic(4);
        this.validateIfDataIsCreatedInJDBCDatabase(jdbcUrl, "user", "pass", "offsets_jdbc");
    }

    private void validateIfDataIsCreatedInJDBCDatabase(String jdbcUrl, String jdbcUser, String jdbcPassword, String jdbcTableName) {
        Connection connection = null;
        try {
            connection = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
            Statement statement = connection.createStatement();
            statement.setQueryTimeout(30);
            ResultSet rs = statement.executeQuery(String.format("select * from %s", jdbcTableName));
            while (rs.next()) {
                String offsetKey = rs.getString("offset_key");
                String offsetValue = rs.getString("offset_val");
                String recordInsertTimestamp = rs.getString("record_insert_ts");
                String recordInsertSequence = rs.getString("record_insert_seq");
                Assert.assertFalse((offsetKey.isBlank() && offsetKey.isEmpty() ? 1 : 0) != 0);
                Assert.assertFalse((offsetValue.isBlank() && offsetValue.isEmpty() ? 1 : 0) != 0);
                Assert.assertFalse((recordInsertTimestamp.isBlank() && recordInsertTimestamp.isEmpty() ? 1 : 0) != 0);
                Assert.assertFalse((recordInsertSequence.isBlank() && recordInsertSequence.isEmpty() ? 1 : 0) != 0);
            }
        }
        catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }
}

