/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlBinaryProtocolFieldReader;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotWithSchemaChangesSupportTest;
import io.debezium.relational.TableId;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public class IncrementalSnapshotIT
extends AbstractIncrementalSnapshotWithSchemaChangesSupportTest<MySqlConnector> {
    protected static final String SERVER_NAME = "is_test";
    protected final UniqueDatabase DATABASE = new UniqueDatabase("is_test", "incremental_snapshot-test").withDbHistoryPath(DB_HISTORY_PATH);

    @Before
    public void before() throws SQLException {
        Assume.assumeFalse((boolean)"legacy".equals(System.getProperty("internal.implementation")));
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void after() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    protected Configuration.Builder config() {
        return ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.USER, "mysqluser")).with(MySqlConnectorConfig.PASSWORD, "mysqlpw")).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY.getValue())).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal"))).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)).with("internal.implementation", "new");
    }

    protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
        String tableIncludeList = signalTableOnly ? this.DATABASE.qualifiedTableName("c") + "," + this.DATABASE.qualifiedTableName("debezium_signal") : this.DATABASE.qualifiedTableName("a") + ", " + this.DATABASE.qualifiedTableName("c") + "," + this.DATABASE.qualifiedTableName("debezium_signal");
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.USER, "mysqluser")).with(MySqlConnectorConfig.PASSWORD, "mysqlpw")).with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue())).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, this.DATABASE.qualifiedTableName("debezium_signal"))).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tableIncludeList)).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with(MySqlConnectorConfig.INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, true)).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl);
    }

    protected Class<MySqlConnector> connectorClass() {
        return MySqlConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
    }

    protected String topicName() {
        return this.DATABASE.topicForTable("a");
    }

    protected String tableName() {
        return TableId.parse((String)this.DATABASE.qualifiedTableName("a")).toQuotedString('`');
    }

    protected List<String> tableNames() {
        String tableA = TableId.parse((String)this.DATABASE.qualifiedTableName("a")).toQuotedString('`');
        String tableB = TableId.parse((String)this.DATABASE.qualifiedTableName("c")).toQuotedString('`');
        return Collect.arrayListOf((Object)tableA, (Object[])new String[]{tableB});
    }

    protected String signalTableName() {
        return TableId.parse((String)this.DATABASE.qualifiedTableName("debezium_signal")).toQuotedString('`');
    }

    protected String tableName(String table) {
        return TableId.parse((String)this.DATABASE.qualifiedTableName(table)).toQuotedString('`');
    }

    protected String alterColumnStatement(String table, String column, String type) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", table, column, type);
    }

    protected String alterColumnSetNotNullStatement(String table, String column, String type) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NOT NULL", table, column, type);
    }

    protected String alterColumnDropNotNullStatement(String table, String column, String type) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s NULL", table, column, type);
    }

    protected String alterColumnSetDefaultStatement(String table, String column, String type, String defaultValue) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s DEFAULT %s", table, column, type, defaultValue);
    }

    protected String alterColumnDropDefaultStatement(String table, String column, String type) {
        return String.format("ALTER TABLE %s MODIFY COLUMN %s %s", table, column, type);
    }

    protected void executeRenameTable(JdbcConnection connection, String newTable) throws SQLException {
        connection.setAutoCommit(false);
        String query = String.format("RENAME TABLE %s to %s, %s to %s", this.tableName(), "old_table", newTable, this.tableName());
        this.logger.info(query);
        connection.executeWithoutCommitting(new String[]{query});
        connection.commit();
    }

    protected String createTableStatement(String newTable, String copyTable) {
        return String.format("CREATE TABLE %s LIKE %s", newTable, copyTable);
    }

    @Test
    public void updates() throws Exception {
        this.populateTable();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int batchSize = 10;
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            for (int i = 0; i < 1000; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("UPDATE %s SET aa = aa + 2000 WHERE pk > %s AND pk <= %s", this.tableName(), i * 10, (i + 1) * 10)});
                connection.commit();
            }
        }
        int expectedRecordCount = 1000;
        Map dbChanges = this.consumeRecordsMixedWithIncrementalSnapshot(1000, x -> ((Struct)((SourceRecord)x.getValue()).value()).getStruct("after").getInt32(this.valueFieldName()) >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            SourceRecord record = (SourceRecord)dbChanges.get(i + 1);
            int value = ((Struct)record.value()).getStruct("after").getInt32(this.valueFieldName());
            Assert.assertEquals((long)(i + 2000), (long)value);
            Object query = ((Struct)record.value()).getStruct("source").get("query");
            String snapshot = ((Struct)record.value()).getStruct("source").get("snapshot").toString();
            if (snapshot.equals("false")) {
                Assert.assertNotNull((Object)query);
                continue;
            }
            Assert.assertNull((Object)query);
            Assert.assertEquals((Object)"incremental", (Object)snapshot);
        }
    }

    @Test
    @FixFor(value={"DBZ-4939"})
    public void tableWithDatetime() throws Exception {
        Testing.Print.enable();
        int ROWS = 10;
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            for (int i = 0; i < 10; ++i) {
                connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO a_dt (pk, dt, d, t) VALUES (%s, TIMESTAMP('%s-05-01'), '%s-05-01', '%s:00:00')", i + 1, i + 2000, i + 2000, i)});
            }
            connection.commit();
        }
        Configuration config = ((Configuration.Builder)this.config().with(MySqlConnectorConfig.SNAPSHOT_FETCH_SIZE, 5)).build();
        this.start(this.connectorClass(), config, (DebeziumEngine.CompletionCallback)this.loggingCompletion());
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
        this.sendAdHocSnapshotSignal(new String[]{this.tableName("a_dt")});
        int expectedRecordCount = 10;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(10, x -> true, k -> k.getInt32(this.pkFieldName()), record -> {
            long ts = ((Struct)record.value()).getStruct("after").getInt64("dt");
            long tsSeconds = ts / 1000L;
            long tsMillis = ts % 1000L;
            LocalDateTime tsDateTime = LocalDateTime.ofEpochSecond(tsSeconds, (int)TimeUnit.MILLISECONDS.toNanos(tsMillis), ZoneOffset.UTC);
            int dateTs = ((Struct)record.value()).getStruct("after").getInt32("d");
            LocalDate date = LocalDate.ofEpochDay(dateTs);
            long timeTs = ((Struct)record.value()).getStruct("after").getInt64("t");
            LocalTime time = LocalTime.ofSecondOfDay(timeTs / 1000000L);
            return Arrays.asList(tsDateTime.toLocalDate(), date, time);
        }, this.DATABASE.topicForTable("a_dt"), null);
        for (int i = 0; i < 10; ++i) {
            LocalDateTime dateTime = LocalDateTime.parse(String.format("%s-05-01T00:00:00", 2000 + i));
            LocalDate dt = dateTime.toLocalDate();
            LocalDate d = LocalDate.parse(String.format("%s-05-01", 2000 + i));
            LocalTime t = LocalTime.parse(String.format("0%s:00:00", i));
            Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)(i + 1), Arrays.asList(dt, d, t))});
        }
    }

    @Test
    @FixFor(value={"DBZ-5099"})
    public void tableWithZeroDate() throws Exception {
        Testing.Print.enable();
        LogInterceptor logInterceptor = new LogInterceptor(MySqlBinaryProtocolFieldReader.class);
        try (JdbcConnection connection = this.databaseConnection();){
            connection.setAutoCommit(false);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO a_date (pk) VALUES (1)"});
            connection.commit();
        }
        Configuration config = ((Configuration.Builder)this.config().with(MySqlConnectorConfig.SNAPSHOT_FETCH_SIZE, 5)).build();
        this.start(this.connectorClass(), config, (DebeziumEngine.CompletionCallback)this.loggingCompletion());
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
        this.sendAdHocSnapshotSignal(new String[]{this.tableName("a_date")});
        boolean expectedRecordCount = true;
        Map dbChanges = this.consumeMixedWithIncrementalSnapshot(1, x -> true, k -> k.getInt32(this.pkFieldName()), record -> {
            Integer d = ((Struct)record.value()).getStruct("after").getInt32("d");
            Integer d_opt = ((Struct)record.value()).getStruct("after").getInt32("d_opt");
            return Arrays.asList(d, d_opt);
        }, this.DATABASE.topicForTable("a_date"), null);
        Assertions.assertThat((Map)dbChanges).includes(new MapAssert.Entry[]{MapAssert.entry((Object)1, Arrays.asList(0, null))});
        Assert.assertFalse((boolean)logInterceptor.containsWarnMessage("Invalid length when read MySQL DATE value. BIN_LEN_DATE is 0."));
    }
}

