/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class MySqlConnectorSchemaValidateIT
extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("sql_bin_log_off", "sql_bin_log_off_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;
    private static final int INITIAL_EVENT_COUNT = 6;

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    @Test
    @FixFor(value={"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOff() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        MySqlConnectorSchemaValidateIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
            connection.execute(new String[]{"ALTER TABLE dbz7093 ADD newcol VARCHAR(20);"});
            connection.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
            connection.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
            connection.execute(new String[]{"UPDATE dbz7093 SET age=2, name='name2', newcol='newcol2' WHERE id=201"});
            connection.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201"});
        }
        MySqlConnectorSchemaValidateIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e == null) {
            Assert.fail();
        }
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(4);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
        Assertions.assertThat((int)recordsForTopic.size()).isEqualTo(4);
        SourceRecord insertEvent = (SourceRecord)recordsForTopic.get(0);
        this.assertInsert(insertEvent, "id", 201);
        this.assertValueField(insertEvent, "after/age", 1);
        this.assertValueField(insertEvent, "after/name", "name1");
        this.assertValueField(insertEvent, "after/newcol", "newcol1");
        SourceRecord updateEvent = (SourceRecord)recordsForTopic.get(1);
        this.assertUpdate(updateEvent, "id", 201);
        this.assertValueField(updateEvent, "before/age", 1);
        this.assertValueField(updateEvent, "before/name", "name1");
        this.assertValueField(updateEvent, "before/newcol", "newcol1");
        this.assertValueField(updateEvent, "after/age", 2);
        this.assertValueField(updateEvent, "after/name", "name2");
        this.assertValueField(updateEvent, "after/newcol", "newcol2");
        SourceRecord deleteEvent = (SourceRecord)recordsForTopic.get(2);
        this.assertDelete(deleteEvent, "id", 201);
        this.assertValueField(deleteEvent, "before/age", 2);
        this.assertValueField(deleteEvent, "before/name", "name2");
        this.assertValueField(deleteEvent, "before/newcol", "newcol2");
        SourceRecord tombstoneEvent = (SourceRecord)recordsForTopic.get(3);
        this.assertTombstone(tombstoneEvent);
    }

    @Test
    @FixFor(value={"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnInMiddleWithSqlLogBinIsOff() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        MySqlConnectorSchemaValidateIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
            connection.execute(new String[]{"ALTER TABLE dbz7093 ADD newcol VARCHAR(20) AFTER age;"});
            connection.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
            connection.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
            connection.execute(new String[]{"UPDATE dbz7093 SET age=2, name='name2', newcol='newcol2' WHERE id=201"});
            connection.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201"});
        }
        MySqlConnectorSchemaValidateIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e == null) {
            Assert.fail();
        }
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(4);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
        Assertions.assertThat((int)recordsForTopic.size()).isEqualTo(4);
        SourceRecord insertEvent = (SourceRecord)recordsForTopic.get(0);
        this.assertInsert(insertEvent, "id", 201);
        this.assertValueField(insertEvent, "after/age", 1);
        this.assertValueField(insertEvent, "after/name", "name1");
        this.assertValueField(insertEvent, "after/newcol", "newcol1");
        SourceRecord updateEvent = (SourceRecord)recordsForTopic.get(1);
        this.assertUpdate(updateEvent, "id", 201);
        this.assertValueField(updateEvent, "before/age", 1);
        this.assertValueField(updateEvent, "before/name", "name1");
        this.assertValueField(updateEvent, "before/newcol", "newcol1");
        this.assertValueField(updateEvent, "after/age", 2);
        this.assertValueField(updateEvent, "after/name", "name2");
        this.assertValueField(updateEvent, "after/newcol", "newcol2");
        SourceRecord deleteEvent = (SourceRecord)recordsForTopic.get(2);
        this.assertDelete(deleteEvent, "id", 201);
        this.assertValueField(deleteEvent, "before/age", 2);
        this.assertValueField(deleteEvent, "before/name", "name2");
        this.assertValueField(deleteEvent, "before/newcol", "newcol2");
        SourceRecord tombstoneEvent = (SourceRecord)recordsForTopic.get(3);
        this.assertTombstone(tombstoneEvent);
    }

    @Test
    @FixFor(value={"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenDropColumnWithSqlLogBinIsOff() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        MySqlConnectorSchemaValidateIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
            connection.execute(new String[]{"ALTER TABLE dbz7093 DROP age;"});
            connection.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
            connection.execute(new String[]{"INSERT INTO dbz7093(id, name) VALUES (201, 'name1');"});
            connection.execute(new String[]{"UPDATE dbz7093 SET name='name2' WHERE id=201;"});
            connection.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201;"});
        }
        MySqlConnectorSchemaValidateIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e == null) {
            Assert.fail();
        }
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(4);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
        Assertions.assertThat((int)recordsForTopic.size()).isEqualTo(4);
        SourceRecord insertEvent = (SourceRecord)recordsForTopic.get(0);
        this.assertInsert(insertEvent, "id", 201);
        this.assertValueField(insertEvent, "after/name", "name1");
        SourceRecord updateEvent = (SourceRecord)recordsForTopic.get(1);
        this.assertUpdate(updateEvent, "id", 201);
        this.assertValueField(updateEvent, "before/name", "name1");
        this.assertValueField(updateEvent, "after/name", "name2");
        SourceRecord deleteEvent = (SourceRecord)recordsForTopic.get(2);
        this.assertDelete(deleteEvent, "id", 201);
        this.assertValueField(deleteEvent, "before/name", "name2");
        SourceRecord tombstoneEvent = (SourceRecord)recordsForTopic.get(3);
        this.assertTombstone(tombstoneEvent);
    }

    @Test
    @FixFor(value={"DBZ-7093"})
    public void shouldRecoverToSyncSchemaWhenAddColumnToEndWithSqlLogBinIsOffAndColumnInclude() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.COLUMN_INCLUDE_LIST, "dbz7093.id,dbz7093.newcol")).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        MySqlConnectorSchemaValidateIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET SQL_LOG_BIN=OFF;"});
            connection.execute(new String[]{"ALTER TABLE dbz7093 ADD newcol VARCHAR(20);"});
            connection.execute(new String[]{"SET SQL_LOG_BIN=ON;"});
            connection.execute(new String[]{"INSERT INTO dbz7093(id, age, name, newcol) VALUES (201, 1,'name1','newcol1');"});
            connection.execute(new String[]{"UPDATE dbz7093 SET newcol='newcol2' WHERE id=201;"});
            connection.execute(new String[]{"DELETE FROM dbz7093 WHERE id=201;"});
        }
        MySqlConnectorSchemaValidateIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e == null) {
            Assert.fail();
        }
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(6);
        records = this.consumeRecordsByTopic(4);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("dbz7093"));
        Assertions.assertThat((int)recordsForTopic.size()).isEqualTo(4);
        SourceRecord insertEvent = (SourceRecord)recordsForTopic.get(0);
        this.assertInsert(insertEvent, "id", 201);
        this.assertValueField(insertEvent, "after/newcol", "newcol1");
        SourceRecord updateEvent = (SourceRecord)recordsForTopic.get(1);
        this.assertUpdate(updateEvent, "id", 201);
        this.assertValueField(updateEvent, "before/newcol", "newcol1");
        this.assertValueField(updateEvent, "after/newcol", "newcol2");
        SourceRecord deleteEvent = (SourceRecord)recordsForTopic.get(2);
        this.assertDelete(deleteEvent, "id", 201);
        this.assertValueField(deleteEvent, "before/newcol", "newcol2");
        SourceRecord tombstoneEvent = (SourceRecord)recordsForTopic.get(3);
        this.assertTombstone(tombstoneEvent);
    }
}

