/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.connector.mysql.legacy.Filters;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.DebeziumEngine;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import junit.framework.TestCase;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class MySqlConnectorIT
extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(DB_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(DB_HISTORY_PATH);
    private static final int PRODUCTS_TABLE_EVENT_COUNT = 9;
    private static final int ORDERS_TABLE_EVENT_COUNT = 5;
    private static final int INITIAL_EVENT_COUNT = 33;
    private Configuration config;

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.SERVER_NAME, "myserver")).with(KafkaDatabaseHistory.TOPIC, "myserver")).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages and exceptions will appear in the log");
        this.start(MySqlConnector.class, this.config, (success, msg, error) -> {
            Assertions.assertThat((boolean)success).isFalse();
            Assertions.assertThat((Throwable)error).isNotNull();
        });
        this.assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        MySqlConnector connector = new MySqlConnector();
        Config result = connector.validate(config.asMap());
        this.assertConfigurationErrors(result, MySqlConnectorConfig.HOSTNAME, 1);
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.PORT});
        this.assertConfigurationErrors(result, MySqlConnectorConfig.USER, 1);
        this.assertConfigurationErrors(result, MySqlConnectorConfig.SERVER_NAME, 2);
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SERVER_ID});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
    }

    @Test
    public void shouldValidateValidConfigurationWithSSL() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.REQUIRED)).with(MySqlConnectorConfig.SSL_KEYSTORE, "/some/path/to/keystore")).with(MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, "keystore1234")).with(MySqlConnectorConfig.SSL_TRUSTSTORE, "/some/path/to/truststore")).with(MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "truststore1234")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, "myServer")).with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")).with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        MySqlConnector connector = new MySqlConnector();
        Config result = connector.validate(config.asMap());
        this.assertConfigurationErrors(result, MySqlConnectorConfig.HOSTNAME, 0, 1);
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.PORT});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.USER});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SERVER_ID});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.TOPIC});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, "myServer")).with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")).with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.ON_CONNECT_STATEMENTS, "SET SESSION wait_timeout=2000")).build();
        MySqlConnector connector = new MySqlConnector();
        Config result = connector.validate(config.asMap());
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.HOSTNAME});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.PORT});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.USER});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.ON_CONNECT_STATEMENTS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SERVER_NAME});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SERVER_ID});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLES_IGNORE_BUILTIN});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TABLE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.COLUMN_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MSG_KEY_COLUMNS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.CONNECTION_TIMEOUT_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_QUEUE_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.MAX_BATCH_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.POLL_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DATABASE_HISTORY});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_NEW_TABLES});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.DECIMAL_HANDLING_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.TIME_PRECISION_MODE});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.BOOTSTRAP_SERVERS});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.TOPIC});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS});
        this.assertNoConfigurationErrors(result, new Field[]{KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS});
    }

    @Test
    @FixFor(value={"DBZ-639"})
    public void shouldValidateLockingModeNoneWithValidSnapshotModeConfiguration() {
        List acceptableValues = Arrays.stream(MySqlConnectorConfig.SnapshotMode.values()).map(MySqlConnectorConfig.SnapshotMode::getValue).collect(Collectors.toList());
        for (String acceptableValue : acceptableValues) {
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultJdbcConfigBuilder().with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, "myServer")).with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")).with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, MySqlConnectorConfig.SnapshotLockingMode.NONE.getValue())).with(MySqlConnectorConfig.SNAPSHOT_MODE, acceptableValue)).build();
            MySqlConnector connector = new MySqlConnector();
            Config result = connector.validate(config.asMap());
            this.assertNoConfigurationErrors(result, new Field[]{MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE});
            Assertions.assertThat((Object)new MySqlConnectorConfig(config).getSnapshotLockingMode()).isEqualTo((Object)MySqlConnectorConfig.SnapshotLockingMode.NONE);
        }
    }

    private Optional<Header> getPKUpdateNewKeyHeader(SourceRecord record) {
        return this.getHeaderField(record, "__debezium.newkey");
    }

    private Optional<Header> getPKUpdateOldKeyHeader(SourceRecord record) {
        return this.getHeaderField(record, "__debezium.oldkey");
    }

    private Optional<Header> getHeaderField(SourceRecord record, String fieldName) {
        return StreamSupport.stream(record.headers().spliterator(), false).filter(header -> fieldName.equals(header.key())).findFirst();
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLException, InterruptedException {
        this.shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, 18765);
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseUsingSnapshotOld() throws SQLException, InterruptedException {
        this.shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(MySqlConnectorConfig.DATABASE_WHITELIST, 18775);
    }

    private void shouldConsumeAllEventsFromDatabaseUsingSnapshotByField(Field dbIncludeListField, int serverId) throws SQLException, InterruptedException {
        JdbcConnection connection;
        JdbcConnection connection2;
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, serverId)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(dbIncludeListField, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(39);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(12);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(5);
        Assertions.assertThat((int)records.databaseNames().size()).isEqualTo(2);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        Assertions.assertThat((List)records.ddlRecordsForDatabase("readbinlog_test")).isNull();
        Assertions.assertThat((int)records.ddlRecordsForDatabase("").size()).isEqualTo(1);
        records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).forEach(arg_0 -> ((MySqlConnectorIT)this).print(arg_0));
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        List allRecords = records.allRecordsInOrder();
        SourceRecord last = (SourceRecord)allRecords.get(allRecords.size() - 1);
        SourceRecord secondToLast = (SourceRecord)allRecords.get(allRecords.size() - 2);
        Assertions.assertThat((boolean)secondToLast.sourceOffset().containsKey("snapshot")).isTrue();
        Assertions.assertThat((boolean)last.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat((String)((Struct)secondToLast.value()).getStruct("source").getString("snapshot")).isEqualTo((Object)"true");
        Assertions.assertThat((String)((Struct)last.value()).getStruct("source").getString("snapshot")).isEqualTo((Object)"last");
        this.waitForAvailableRecords(3L, TimeUnit.SECONDS);
        int totalConsumed = this.consumeAvailableRecords(arg_0 -> ((MySqlConnectorIT)this).print(arg_0));
        System.out.println("TOTAL CONSUMED = " + totalConsumed);
        this.stopConnector();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection3 = db.connect();){
            connection3.query("SELECT * FROM products", rs -> {
                if (Testing.Print.isEnabled()) {
                    connection3.print(rs);
                }
            });
            connection3.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304);"});
            connection3.query("SELECT * FROM products", rs -> {
                if (Testing.Print.isEnabled()) {
                    connection3.print(rs);
                }
            });
        }
        Testing.print((Object)"*** Restarting connector after inserts were made");
        this.start(MySqlConnector.class, this.config);
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        List inserts = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        this.assertInsert((SourceRecord)inserts.get(0), "id", 110);
        Testing.print((Object)"*** Done with inserts and restart");
        Testing.print((Object)"*** Stopping connector");
        this.stopConnector();
        Testing.print((Object)"*** Restarting connector");
        this.start(MySqlConnector.class, this.config);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            connection2 = db.connect();
            try {
                connection2.execute(new String[]{"INSERT INTO products VALUES (1001,'roy','old robot',1234.56);"});
                connection2.query("SELECT * FROM products", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection2.print(rs);
                    }
                });
            }
            finally {
                if (connection2 != null) {
                    connection2.close();
                }
            }
        }
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        inserts = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        this.assertInsert((SourceRecord)inserts.get(0), "id", 1001);
        Testing.print((Object)"*** Done with simple insert");
        db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            connection2 = db.connect();
            try {
                connection2.execute(new String[]{"UPDATE products SET id=2001, description='really old robot' WHERE id=1001"});
                connection2.query("SELECT * FROM products", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection2.print(rs);
                    }
                });
            }
            finally {
                if (connection2 != null) {
                    connection2.close();
                }
            }
        }
        finally {
            if (db != null) {
                db.close();
            }
        }
        records = this.consumeRecordsByTopic(3);
        List updates = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        Assertions.assertThat((int)updates.size()).isEqualTo(3);
        SourceRecord deleteRecord = (SourceRecord)updates.get(0);
        this.assertDelete(deleteRecord, "id", 1001);
        Header keyPKUpdateHeader = this.getPKUpdateNewKeyHeader(deleteRecord).get();
        TestCase.assertEquals((Object)2001, (Object)((Struct)keyPKUpdateHeader.value()).getInt32("id"));
        this.assertTombstone((SourceRecord)updates.get(1), "id", 1001);
        SourceRecord insertRecord = (SourceRecord)updates.get(2);
        this.assertInsert(insertRecord, "id", 2001);
        keyPKUpdateHeader = this.getPKUpdateOldKeyHeader(insertRecord).get();
        TestCase.assertEquals((Object)1001, (Object)((Struct)keyPKUpdateHeader.value()).getInt32("id"));
        Testing.print((Object)"*** Done with PK change");
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            connection = db.connect();
            try {
                connection.execute(new String[]{"UPDATE products SET weight=1345.67 WHERE id=2001"});
                connection.query("SELECT * FROM products", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
            }
            finally {
                if (connection != null) {
                    connection.close();
                }
            }
        }
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        Assertions.assertThat((int)updates.size()).isEqualTo(1);
        this.assertUpdate((SourceRecord)updates.get(0), "id", 2001);
        updates.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        Testing.print((Object)"*** Done with simple update");
        db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            connection = db.connect();
            try {
                connection.execute(new String[]{String.format("ALTER TABLE %s.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description", this.DATABASE.getDatabaseName())});
                connection.execute(new String[]{"UPDATE products SET volume=13.5 WHERE id=2001"});
                connection.query("SELECT * FROM products", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
            }
            finally {
                if (connection != null) {
                    connection.close();
                }
            }
        }
        finally {
            if (db != null) {
                db.close();
            }
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        Assertions.assertThat((int)updates.size()).isEqualTo(1);
        this.assertUpdate((SourceRecord)updates.get(0), "id", 2001);
        updates.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        Testing.print((Object)"*** Done with schema change (same db and fully-qualified name)");
        db = MySqlTestConnection.forTestDatabase("emptydb");
        try {
            connection = db.connect();
            try {
                connection.execute(new String[]{String.format("CREATE TABLE %s.stores ( id INT(11) PRIMARY KEY NOT NULL AUTO_INCREMENT, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL );", this.DATABASE.getDatabaseName())});
            }
            finally {
                if (connection != null) {
                    connection.close();
                }
            }
        }
        finally {
            if (db != null) {
                db.close();
            }
        }
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(1);
        records.recordsForTopic(this.DATABASE.getServerName()).forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        Testing.print((Object)"*** Done with PK change (different db and fully-qualified name)");
        db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        try {
            connection = db.connect();
            try {
                connection.execute(new String[]{"UPDATE products_on_hand SET quantity=20 WHERE product_id=109"});
                connection.query("SELECT * FROM products_on_hand", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
            }
            finally {
                if (connection != null) {
                    connection.close();
                }
            }
        }
        finally {
            if (db != null) {
                db.close();
            }
        }
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
        Assertions.assertThat((int)updates.size()).isEqualTo(1);
        this.assertUpdate((SourceRecord)updates.get(0), "product_id", 109);
        updates.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        Testing.print((Object)"*** Done with verifying no additional events");
        this.stopConnector();
        Testing.print((Object)"*** Restarting connector");
        EmbeddedEngine.CompletionResult completion = new EmbeddedEngine.CompletionResult();
        this.start(MySqlConnector.class, this.config, (DebeziumEngine.CompletionCallback)completion, record -> {
            Struct key = (Struct)record.key();
            Number id = (Number)key.get("id");
            return id.intValue() == 3003;
        });
        BinlogPosition positionBeforeInserts = new BinlogPosition();
        BinlogPosition positionAfterInserts = new BinlogPosition();
        BinlogPosition positionAfterUpdate = new BinlogPosition();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection4 = db.connect();){
            connection4.query("SHOW MASTER STATUS", positionBeforeInserts::readFromDatabase);
            connection4.execute(new String[]{"INSERT INTO products(id,name,description,weight,volume,alias) VALUES (3001,'ashley','super robot',34.56,0.00,'ashbot'), (3002,'arthur','motorcycle',87.65,0.00,'arcycle'), (3003,'oak','tree',987.65,0.00,'oak');"});
            connection4.query("SELECT * FROM products", rs -> {
                if (Testing.Print.isEnabled()) {
                    connection4.print(rs);
                }
            });
            connection4.query("SHOW MASTER STATUS", positionAfterInserts::readFromDatabase);
            connection4.execute(new String[]{"UPDATE products_on_hand SET quantity=40 WHERE product_id=109"});
            connection4.query("SELECT * FROM products_on_hand", rs -> {
                if (Testing.Print.isEnabled()) {
                    connection4.print(rs);
                }
            });
            connection4.query("SHOW MASTER STATUS", positionAfterUpdate::readFromDatabase);
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        inserts = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        this.assertInsert((SourceRecord)inserts.get(0), "id", 3001);
        this.assertInsert((SourceRecord)inserts.get(1), "id", 3002);
        completion.await(10L, TimeUnit.SECONDS);
        Assertions.assertThat((boolean)completion.hasCompleted()).isTrue();
        Assertions.assertThat((boolean)completion.hasError()).isTrue();
        Assertions.assertThat((boolean)completion.success()).isFalse();
        this.assertNoRecordsToConsume();
        this.assertConnectorNotRunning();
        this.stopConnector();
        String serverName = this.config.getString(MySqlConnectorConfig.SERVER_NAME);
        MySqlOffsetContext.Loader loader = new MySqlOffsetContext.Loader(new MySqlConnectorConfig(((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.SERVER_NAME, serverName)).build()));
        Map partition = new MySqlPartition(serverName).getSourcePartition();
        Map lastCommittedOffset = this.readLastCommittedOffset(this.config, partition);
        MySqlOffsetContext offsetContext = loader.load(lastCommittedOffset);
        SourceInfo persistedOffsetSource = offsetContext.getSource();
        Testing.print((Object)("Position before inserts: " + positionBeforeInserts));
        Testing.print((Object)("Position after inserts:  " + positionAfterInserts));
        Testing.print((Object)("Offset: " + lastCommittedOffset));
        Testing.print((Object)("Position after update:  " + positionAfterUpdate));
        if (replicaIsMaster) {
            Assertions.assertThat((String)persistedOffsetSource.binlogFilename()).isEqualTo((Object)positionBeforeInserts.binlogFilename());
            Assertions.assertThat((String)persistedOffsetSource.binlogFilename()).isEqualTo((Object)positionAfterInserts.binlogFilename());
            MySqlTestConnection.MySqlVersion mysqlVersion = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName()).getMySqlVersion();
            if (mysqlVersion == MySqlTestConnection.MySqlVersion.MYSQL_5_5 || mysqlVersion == MySqlTestConnection.MySqlVersion.MYSQL_5_6) {
                Assertions.assertThat((long)persistedOffsetSource.binlogPosition()).isGreaterThanOrEqualTo(positionBeforeInserts.binlogPosition());
            } else {
                Assertions.assertThat((long)persistedOffsetSource.binlogPosition()).isGreaterThan(positionBeforeInserts.binlogPosition());
            }
            Assertions.assertThat((long)persistedOffsetSource.binlogPosition()).isLessThan(positionAfterInserts.binlogPosition());
        }
        Assertions.assertThat((long)offsetContext.eventsToSkipUponRestart()).isEqualTo(2L);
        Testing.print((Object)"*** Restarting connector, and should begin with inserting 3003 (not 109!)");
        this.start(MySqlConnector.class, this.config);
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        inserts = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        if (inserts == null && (updates = records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"))) != null) {
            Assert.fail((String)"Restarted connector and missed the insert of product id=3003!");
        }
        SourceRecord prod3003 = (SourceRecord)inserts.get(0);
        this.assertInsert(prod3003, "id", 3003);
        this.assertOffset(prod3003, "file", lastCommittedOffset.get("file"));
        this.assertOffset(prod3003, "pos", lastCommittedOffset.get("pos"));
        this.assertOffset(prod3003, "row", 3);
        this.assertOffset(prod3003, "event", lastCommittedOffset.get("event"));
        this.assertValueField(prod3003, "after/id", 3003);
        this.assertValueField(prod3003, "after/name", "oak");
        this.assertValueField(prod3003, "after/description", "tree");
        this.assertValueField(prod3003, "after/weight", 987.65);
        this.assertValueField(prod3003, "after/volume", 0.0);
        this.assertValueField(prod3003, "after/alias", "oak");
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand"));
        Assertions.assertThat((int)updates.size()).isEqualTo(1);
        this.assertUpdate((SourceRecord)updates.get(0), "product_id", 109);
        updates.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        Testing.print((Object)"*** Done with simple insert");
    }

    @Test
    public void shouldUseOverriddenSelectStatementDuringSnapshotting() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 28765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.getDatabaseName() + ".products")).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, this.DATABASE.getDatabaseName() + ".products")).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(8);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(6);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
    }

    @Test
    public void shouldUseMultipleOverriddenSelectStatementsDuringSnapshotting() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        String tables = String.format("%s.products,%s.products_on_hand", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 28765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, tables)).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products", String.format("SELECT * from %s.products where id>=108 order by id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE + "." + this.DATABASE.getDatabaseName() + ".products_on_hand", String.format("SELECT * from %s.products_on_hand where product_id>=108 order by product_id", this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.getServerName()).size()).isEqualTo(8);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(2);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0)).key()).getInt32("id")).isEqualTo(108);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(1)).key()).getInt32("id")).isEqualTo(109);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(0)).key()).getInt32("product_id")).isEqualTo(108);
        Assertions.assertThat((Integer)((Struct)((SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products_on_hand")).get(1)).key()).getInt32("product_id")).isEqualTo(109);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
    }

    @Test
    @FixFor(value={"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(5);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
            connection.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
            connection.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(1);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1201"})
    public void shouldSaveSetCharacterSetWhenStoringOnlyCapturededTables() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "no_" + this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.ddlRecordsForDatabase("").size()).isEqualTo(1);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1246"})
    public void shouldProcessCreateUniqueIndex() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.migration_test", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.waitForStreamingRunning(this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"create table migration_test (id varchar(20) null,mgb_no varchar(20) null)", "create unique index migration_test_mgb_no_uindex on migration_test (mgb_no)", "insert into migration_test values(1,'2')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(15);
        List migrationTestRecords = records.recordsForTopic(this.DATABASE.topicForTable("migration_test"));
        Assertions.assertThat((int)migrationTestRecords.size()).isEqualTo(1);
        SourceRecord record = (SourceRecord)migrationTestRecords.get(0);
        Assertions.assertThat((String)((Struct)record.key()).getString("mgb_no")).isEqualTo((Object)"2");
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(13);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-977"})
    public void shouldIgnoreAlterTableForNonCapturedTablesStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.dropDatabases();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"ALTER TABLE orders ADD COLUMN (newcol INT)"});
            connection.execute(new String[]{"ALTER TABLE customers ADD COLUMN (newcol INT)"});
            connection.execute(new String[]{"INSERT INTO customers VALUES (default,'name','surname','email',1);"});
        }
        records = this.consumeRecordsByTopic(3);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("customers")).size()).isEqualTo(1);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(2);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1264"})
    public void shouldIgnoreCreateIndexForNonCapturedTablesNotStoredInHistory() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).build();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"CREATE TABLE nonmon (id INT)"});
        }
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(5);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"CREATE UNIQUE INDEX pk ON nonmon(id)", "INSERT INTO customers VALUES (default,'name','surname','email');"});
        }
        SourceRecord record = this.consumeRecord();
        Assertions.assertThat((String)record.topic()).isEqualTo((Object)this.DATABASE.topicForTable("customers"));
    }

    @Test
    @FixFor(value={"DBZ-683"})
    public void shouldReceiveSchemaForNonWhitelistedTablesAndDatabases() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers,%s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*")).build();
        this.dropDatabases();
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("mysql");
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"CREATE DATABASE non_wh", "USE non_wh", "CREATE TABLE t1 (ID INT PRIMARY KEY)"});
        }
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(17);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1546"})
    public void shouldHandleIncludeListTables() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers, %s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, tables)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, ".*")).build();
        this.dropDatabases();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(17);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        this.stopConnector();
    }

    @Test
    public void shouldHandleWhitelistedTables() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tables = String.format("%s.customers, %s.orders", this.DATABASE.getDatabaseName(), this.DATABASE.getDatabaseName());
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, tables)).with(MySqlConnectorConfig.DATABASE_WHITELIST, ".*")).build();
        this.dropDatabases();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(17);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName()).size()).isEqualTo(11);
        this.stopConnector();
    }

    private void dropDatabases() throws SQLException {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase("mysql");
             JdbcConnection connection = db.connect();){
            connection.query("SHOW DATABASES", rs -> {
                while (rs.next()) {
                    String dbName = rs.getString(1);
                    if (Filters.isBuiltInDatabase((String)dbName) || dbName.equals(this.DATABASE.getDatabaseName())) continue;
                    connection.execute(new String[]{"DROP DATABASE IF EXISTS " + dbName});
                }
            });
        }
    }

    private Struct getAfter(SourceRecord record) {
        return (Struct)((Struct)record.value()).get("after");
    }

    @Test
    public void shouldConsumeEventsWithNoSnapshot() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(33);
        Assertions.assertThat((int)this.recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).size()).isEqualTo(9);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(5);
        Assertions.assertThat((int)records.ddlRecordsForDatabase(this.RO_DATABASE.getDatabaseName()).size()).isEqualTo(6);
        Optional<SourceRecord> recordWithScientfic = records.recordsForTopic(this.RO_DATABASE.topicForTable("Products")).stream().filter(x -> "hammer2".equals(this.getAfter((SourceRecord)x).get("name"))).findFirst();
        Assertions.assertThat((boolean)recordWithScientfic.isPresent());
        Assertions.assertThat((Object)this.getAfter(recordWithScientfic.get()).get("weight")).isEqualTo((Object)0.875);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        this.stopConnector();
        records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(record -> this.print((SourceRecord)record));
        records.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(record -> this.print((SourceRecord)record));
    }

    @Test
    @FixFor(value={"DBZ-1962"})
    public void shouldConsumeEventsWithIncludedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.COLUMN_INCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("orders") + ".order_number")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(28);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(5);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        this.stopConnector();
        records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(record -> {
            this.print((SourceRecord)record);
            Struct value = ((Struct)record.value()).getStruct("after");
            try {
                value.get("order_number");
            }
            catch (DataException e) {
                Assert.fail((String)"The 'order_number' field was not found but should exist");
            }
            try {
                value.get("order_date");
                Assert.fail((String)"The 'order_date' field was found but should be filtered");
            }
            catch (DataException dataException) {
                // empty catch block
            }
        });
    }

    @Test
    public void shouldConsumeEventsWithMaskedAndBlacklistedColumns() throws SQLException, InterruptedException {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)((Configuration.Builder)this.RO_DATABASE.defaultConfig().with(MySqlConnectorConfig.COLUMN_EXCLUDE_LIST, this.RO_DATABASE.qualifiedTableName("orders") + ".order_number")).with("column.mask.with.12.chars", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(28);
        Assertions.assertThat((int)this.recordsForTopicForRoProductsTable(records).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand")).size()).isEqualTo(9);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(5);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        this.stopConnector();
        records.recordsForTopic(this.RO_DATABASE.topicForTable("orders")).forEach(record -> {
            this.print((SourceRecord)record);
            Struct value = (Struct)record.value();
            try {
                value.getStruct("after").get("order_number");
                Assert.fail((String)"The 'order_number' field was found but should not exist");
            }
            catch (DataException dataException) {
                // empty catch block
            }
        });
        records.recordsForTopic(this.RO_DATABASE.topicForTable("customers")).forEach(record -> {
            Struct value = (Struct)record.value();
            if (value.getStruct("after") != null) {
                Assertions.assertThat((String)value.getStruct("after").getString("email")).isEqualTo((Object)"************");
            }
            if (value.getStruct("before") != null) {
                Assertions.assertThat((String)value.getStruct("before").getString("email")).isEqualTo((Object)"************");
            }
            this.print((SourceRecord)record);
        });
    }

    @Test
    @FixFor(value={"DBZ-1692"})
    public void shouldConsumeEventsWithMaskedHashedColumns() throws InterruptedException {
        Struct value1004;
        Struct value1003;
        Struct value1002;
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)this.RO_DATABASE.defaultConfig().with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(28);
        Assertions.assertThat(this.recordsForTopicForRoProductsTable(records)).hasSize(9);
        Assertions.assertThat((List)records.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand"))).hasSize(9);
        List customers = records.recordsForTopic(this.RO_DATABASE.topicForTable("customers"));
        Assertions.assertThat((List)customers).hasSize(4);
        Assertions.assertThat((List)records.recordsForTopic(this.RO_DATABASE.topicForTable("orders"))).hasSize(5);
        Assertions.assertThat((Collection)records.topics()).hasSize(5);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        this.stopConnector();
        Struct value1001 = (Struct)((SourceRecord)customers.get(0)).value();
        if (value1001.getStruct("after") != null) {
            Assertions.assertThat((String)value1001.getStruct("after").getString("email")).isEqualTo((Object)"d540e71abf15be8b51c7967397ba359db27d6f6ae85a297fe8d0d7005ffd0e82");
        }
        if ((value1002 = (Struct)((SourceRecord)customers.get(1)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1002.getStruct("after").getString("email")).isEqualTo((Object)"b1f1a1a63559c1d3a98bd7bb5c363d7e21a37463a7266bc2ff341eaef7ac8ef3");
        }
        if ((value1003 = (Struct)((SourceRecord)customers.get(2)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1003.getStruct("after").getString("email")).isEqualTo((Object)"bbe1de7b1068bc8f86bbb19f432ce1d44fbd461339916f42544b3f7ebff674d6");
        }
        if ((value1004 = (Struct)((SourceRecord)customers.get(3)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1004.getStruct("after").getString("email")).isEqualTo((Object)"ff21be44fb224e57d822ea9a51d343d77e4c49ac3dedd3d144024ac2012af0a1");
        }
    }

    @Test
    @FixFor(value={"DBZ-1972"})
    public void shouldConsumeEventsWithTruncatedColumns() throws InterruptedException {
        Struct value1004;
        Struct value1003;
        Struct value1002;
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.config = ((Configuration.Builder)this.RO_DATABASE.defaultConfig().with("column.truncate.to.7.chars", this.RO_DATABASE.qualifiedTableName("customers") + ".email").with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(28);
        Assertions.assertThat(this.recordsForTopicForRoProductsTable(records)).hasSize(9);
        Assertions.assertThat((List)records.recordsForTopic(this.RO_DATABASE.topicForTable("products_on_hand"))).hasSize(9);
        List customers = records.recordsForTopic(this.RO_DATABASE.topicForTable("customers"));
        Assertions.assertThat((List)customers).hasSize(4);
        Assertions.assertThat((List)records.recordsForTopic(this.RO_DATABASE.topicForTable("orders"))).hasSize(5);
        Assertions.assertThat((Collection)records.topics()).hasSize(5);
        records.forEach(arg_0 -> ((MySqlConnectorIT)this).validate(arg_0));
        this.stopConnector();
        Struct value1001 = (Struct)((SourceRecord)customers.get(0)).value();
        if (value1001.getStruct("after") != null) {
            Assertions.assertThat((String)value1001.getStruct("after").getString("email")).isEqualTo((Object)"sally.t");
        }
        if ((value1002 = (Struct)((SourceRecord)customers.get(1)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1002.getStruct("after").getString("email")).isEqualTo((Object)"gbailey");
        }
        if ((value1003 = (Struct)((SourceRecord)customers.get(2)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1003.getStruct("after").getString("email")).isEqualTo((Object)"ed@walk");
        }
        if ((value1004 = (Struct)((SourceRecord)customers.get(3)).value()).getStruct("after") != null) {
            Assertions.assertThat((String)value1004.getStruct("after").getString("email")).isEqualTo((Object)"annek@n");
        }
    }

    @Test
    @FixFor(value={"DBZ-582"})
    public void shouldEmitTombstoneOnDeleteByDefault() throws Exception {
        this.config = ((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(33);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
        }
        records = this.consumeRecordsByTopic(3);
        List updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(3);
        this.assertDelete((SourceRecord)updates.get(0), "order_number", 10001);
        this.assertTombstone((SourceRecord)updates.get(1), "order_number", 10001);
        this.assertInsert((SourceRecord)updates.get(2), "order_number", 10101);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"DELETE FROM orders WHERE order_number=10101"});
        }
        records = this.consumeRecordsByTopic(2);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(2);
        this.assertDelete((SourceRecord)updates.get(0), "order_number", 10101);
        this.assertTombstone((SourceRecord)updates.get(1), "order_number", 10101);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-582"})
    public void shouldEmitNoTombstoneOnDelete() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(33);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"UPDATE orders SET order_number=10101 WHERE order_number=10001"});
        }
        records = this.consumeRecordsByTopic(2);
        List updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(2);
        this.assertDelete((SourceRecord)updates.get(0), "order_number", 10001);
        this.assertInsert((SourceRecord)updates.get(1), "order_number", 10101);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"DELETE FROM orders WHERE order_number = 10101;"});
            connection.execute(new String[]{"DELETE FROM orders WHERE order_number = 10002;"});
        }
        records = this.consumeRecordsByTopic(2);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(2);
        this.assertDelete((SourceRecord)updates.get(0), "order_number", 10101);
        this.assertDelete((SourceRecord)updates.get(1), "order_number", 10002);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-794"})
    public void shouldEmitNoSavepoints() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(33);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        this.waitForStreamingRunning(this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            Connection jdbc = connection.connection();
            connection.setAutoCommit(false);
            Statement statement = jdbc.createStatement();
            statement.executeUpdate("DELETE FROM orders WHERE order_number = 10001");
            statement.executeUpdate("SavePoint sp2");
            statement.executeUpdate("DELETE FROM orders WHERE order_number = 10002");
            jdbc.commit();
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.ddlRecordsForDatabase(this.DATABASE.getDatabaseName())).isNullOrEmpty();
        List deletes = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        this.assertDelete((SourceRecord)deletes.get(0), "order_number", 10001);
        this.assertDelete((SourceRecord)deletes.get(1), "order_number", 10002);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void shouldNotParseQueryIfServerOptionDisabled() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.waitForStreamingRunning(this.DATABASE.getServerName());
        this.consumeRecords(9, null);
        String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=OFF"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.validate(sourceRecord);
        this.assertInsert(sourceRecord, "id", 110);
        this.assertHasNoSourceQuery(sourceRecord);
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void shouldNotParseQueryIfConnectorNotConfiguredTo() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(9, null);
        String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.logger.info("Record: {}", (Object)sourceRecord);
        this.validate(sourceRecord);
        this.assertInsert(sourceRecord, "id", 110);
        this.assertHasNoSourceQuery(sourceRecord);
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(9, null);
        String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.logger.info("Record: {}", (Object)sourceRecord);
        this.validate(sourceRecord);
        this.assertInsert(sourceRecord, "id", 110);
        this.assertSourceQuery(sourceRecord, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseMultipleInsertStatements() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(9, null);
        String insertSqlStatement1 = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)";
        String insertSqlStatement2 = "INSERT INTO products VALUES (default,'toaster','Toaster',3.33)";
        this.logger.warn(this.DATABASE.getDatabaseName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304)"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'toaster','Toaster',3.33)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        SourceRecord sourceRecord1 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.validate(sourceRecord1);
        this.assertInsert(sourceRecord1, "id", 110);
        this.assertSourceQuery(sourceRecord1, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
        SourceRecord sourceRecord2 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
        this.validate(sourceRecord2);
        this.assertInsert(sourceRecord2, "id", 111);
        this.assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'toaster','Toaster',3.33)");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseMultipleRowInsertStatement() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(9, null);
        String insertSqlStatement = "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)";
        this.logger.warn(this.DATABASE.getDatabaseName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(2);
        SourceRecord sourceRecord1 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.validate(sourceRecord1);
        this.assertInsert(sourceRecord1, "id", 110);
        this.assertSourceQuery(sourceRecord1, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
        SourceRecord sourceRecord2 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(1);
        this.validate(sourceRecord2);
        this.assertInsert(sourceRecord2, "id", 111);
        this.assertSourceQuery(sourceRecord2, "INSERT INTO products VALUES (default,'robot','Toy robot',1.304), (default,'toaster','Toaster',3.33)");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseDeleteQuery() throws Exception {
        String tableName = "orders";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(5, null);
        String deleteSqlStatement = "DELETE FROM orders WHERE order_number=10001 LIMIT 1";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"DELETE FROM orders WHERE order_number=10001 LIMIT 1"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
        this.validate(sourceRecord);
        this.assertDelete(sourceRecord, "order_number", 10001);
        this.assertSourceQuery(sourceRecord, "DELETE FROM orders WHERE order_number=10001 LIMIT 1");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseMultiRowDeleteQuery() throws Exception {
        String tableName = "orders";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(5, null);
        String deleteSqlStatement = "DELETE FROM orders WHERE purchaser=1002";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"DELETE FROM orders WHERE purchaser=1002"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
        SourceRecord sourceRecord1 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
        this.validate(sourceRecord1);
        this.assertDelete(sourceRecord1, "order_number", 10002);
        this.assertSourceQuery(sourceRecord1, "DELETE FROM orders WHERE purchaser=1002");
        SourceRecord sourceRecord2 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
        this.validate(sourceRecord2);
        this.assertDelete(sourceRecord2, "order_number", 10004);
        this.assertSourceQuery(sourceRecord2, "DELETE FROM orders WHERE purchaser=1002");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseUpdateQuery() throws Exception {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(9, null);
        String updateSqlStatement = "UPDATE products set name='toaster' where id=109 LIMIT 1";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"UPDATE products set name='toaster' where id=109 LIMIT 1"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("products")).size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("products")).get(0);
        this.validate(sourceRecord);
        this.assertUpdate(sourceRecord, "id", 109);
        this.assertSourceQuery(sourceRecord, "UPDATE products set name='toaster' where id=109 LIMIT 1");
    }

    @Test
    @FixFor(value={"DBZ-706"})
    public void parseMultiRowUpdateQuery() throws Exception {
        String tableName = "orders";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecords(5, null);
        String updateSqlStatement = "UPDATE orders set quantity=0 where order_number in (10001, 10004)";
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"SET binlog_rows_query_log_events=ON"});
            connection.execute(new String[]{"UPDATE orders set quantity=0 where order_number in (10001, 10004)"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(2);
        SourceRecord sourceRecord1 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("orders")).get(0);
        this.validate(sourceRecord1);
        this.assertUpdate(sourceRecord1, "order_number", 10001);
        this.assertSourceQuery(sourceRecord1, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
        SourceRecord sourceRecord2 = (SourceRecord)records.recordsForTopic(this.DATABASE.topicForTable("orders")).get(1);
        this.validate(sourceRecord2);
        this.assertUpdate(sourceRecord2, "order_number", 10004);
        this.assertSourceQuery(sourceRecord2, "UPDATE orders set quantity=0 where order_number in (10001, 10004)");
    }

    @Test
    @FixFor(value={"DBZ-1234"})
    public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).with(MySqlConnectorConfig.TIME_PRECISION_MODE, (EnumeratedValue)TemporalPrecisionMode.ADAPTIVE)).build();
        MySqlConnector connector = new MySqlConnector();
        Config result = connector.validate(this.config.asMap());
        this.assertConfigurationErrors(result, MySqlConnectorConfig.TIME_PRECISION_MODE);
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "my_database")).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecordsByTopic(12);
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue());
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testNoEmptySchemaLogWarningWithDatabaseWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(MySqlConnector.class, this.config);
        this.consumeRecordsByTopic(12);
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse());
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("my_products"))).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        MySqlConnectorIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        this.consumeRecordsByTopic(12);
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue());
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testNoEmptySchemaWarningWithTableWhitelist() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(12);
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse());
    }

    @Test
    @FixFor(value={"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException, SQLException {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "(.*).products:id,name")).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(9);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        recordsForTopic.forEach(record -> {
            Struct key = (Struct)record.key();
            Assertions.assertThat((Object)key.get("id")).isNotNull();
            Assertions.assertThat((Object)key.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor(value={"DBZ-2957"})
    public void shouldRewriteIdentityKeyWithWhitespace() throws InterruptedException, SQLException {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "   (.*).products:id,name   ")).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(9);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        recordsForTopic.forEach(record -> {
            Struct key = (Struct)record.key();
            Assertions.assertThat((Object)key.get("id")).isNotNull();
            Assertions.assertThat((Object)key.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor(value={"DBZ-2957"})
    public void shouldRewriteIdentityKeyWithMsgKeyColumnsFieldRegexValidation() throws InterruptedException, SQLException {
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)).with(MySqlConnectorConfig.MSG_KEY_COLUMNS, "(.*).products:id,name;")).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(9);
        List recordsForTopic = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        recordsForTopic.forEach(record -> {
            Struct key = (Struct)record.key();
            Assertions.assertThat((Object)key.get("id")).isNotNull();
            Assertions.assertThat((Object)key.get("name")).isNotNull();
        });
    }

    @Test
    @FixFor(value={"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        String tableName = "products";
        this.config = ((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("products"))).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(9);
        List table = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        for (SourceRecord record : table) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"mysql", (String)"myServer1", (boolean)false);
        }
    }

    private void waitForStreamingRunning(String serverName) throws InterruptedException {
        MySqlConnectorIT.waitForStreamingRunning((String)"mysql", (String)serverName, (String)MySqlConnectorIT.getStreamingNamespace());
    }

    private List<SourceRecord> recordsForTopicForRoProductsTable(AbstractConnectorTest.SourceRecords records) {
        List uc = records.recordsForTopic(this.RO_DATABASE.topicForTable("Products"));
        return uc != null ? uc : records.recordsForTopic(this.RO_DATABASE.topicForTable("products"));
    }

    @Test
    @FixFor(value={"DBZ-1531"})
    public void shouldEmitHeadersOnPrimaryKeyUpdate() throws Exception {
        this.config = ((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(33);
        Assertions.assertThat((int)records.recordsForTopic(this.DATABASE.topicForTable("orders")).size()).isEqualTo(5);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"UPDATE orders SET order_number=10303 WHERE order_number=10003"});
        }
        records = this.consumeRecordsByTopic(3);
        List updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(3);
        SourceRecord deleteRecord = (SourceRecord)updates.get(0);
        Header keyPKUpdateHeader = this.getPKUpdateNewKeyHeader(deleteRecord).get();
        TestCase.assertEquals((Object)10303, (Object)((Struct)keyPKUpdateHeader.value()).getInt32("order_number"));
        SourceRecord insertRecord = (SourceRecord)updates.get(2);
        keyPKUpdateHeader = this.getPKUpdateOldKeyHeader(insertRecord).get();
        TestCase.assertEquals((Object)10003, (Object)((Struct)keyPKUpdateHeader.value()).getInt32("order_number"));
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"UPDATE orders SET quantity=5 WHERE order_number=10004"});
        }
        records = this.consumeRecordsByTopic(1);
        updates = records.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((int)updates.size()).isEqualTo(1);
        SourceRecord updateRecord = (SourceRecord)updates.get(0);
        Assertions.assertThat((boolean)this.getPKUpdateNewKeyHeader(updateRecord).isPresent()).isFalse();
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1895"})
    public void shouldEmitNoEventsForSkippedCreateOperations() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "c")).build();
        this.start(MySqlConnector.class, this.config);
        MySqlConnectorIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"INSERT INTO products VALUES (201,'rubberduck','Rubber Duck',2.12);"});
            connection.execute(new String[]{"UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"});
            connection.execute(new String[]{"INSERT INTO products VALUES (202,'rubbercrocodile','Rubber Crocodile',4.14);"});
            connection.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
            connection.execute(new String[]{"INSERT INTO products VALUES (203,'rubberfish','Rubber Fish',5.15);"});
            connection.execute(new String[]{"DELETE FROM products WHERE name = 'rubbercrocodile'"});
            connection.execute(new String[]{"DELETE FROM products WHERE name = 'rubberfish'"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(7);
        List changeEvents = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        this.assertUpdate((SourceRecord)changeEvents.get(0), "id", 201);
        this.assertDelete((SourceRecord)changeEvents.get(1), "id", 201);
        this.assertTombstone((SourceRecord)changeEvents.get(2), "id", 201);
        this.assertDelete((SourceRecord)changeEvents.get(3), "id", 202);
        this.assertTombstone((SourceRecord)changeEvents.get(4), "id", 202);
        this.assertDelete((SourceRecord)changeEvents.get(5), "id", 203);
        this.assertTombstone((SourceRecord)changeEvents.get(6), "id", 203);
        Assertions.assertThat((int)changeEvents.size()).isEqualTo(7);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1895"})
    public void shouldEmitNoEventsForSkippedUpdateAndDeleteOperations() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.TOMBSTONES_ON_DELETE, false)).with(MySqlConnectorConfig.SKIPPED_OPERATIONS, "u,d")).build();
        this.start(MySqlConnector.class, this.config);
        MySqlConnectorIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"INSERT INTO products VALUES (204,'rubberduck','Rubber Duck',2.12);"});
            connection.execute(new String[]{"UPDATE products SET weight=3.13 WHERE name = 'rubberduck'"});
            connection.execute(new String[]{"INSERT INTO products VALUES (205,'rubbercrocodile','Rubber Crocodile',4.14);"});
            connection.execute(new String[]{"DELETE FROM products WHERE name = 'rubberduck'"});
            connection.execute(new String[]{"INSERT INTO products VALUES (206,'rubberfish','Rubber Fish',5.15);"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(3);
        List changeEvents = records.recordsForTopic(this.DATABASE.topicForTable("products"));
        this.assertInsert((SourceRecord)changeEvents.get(0), "id", 204);
        this.assertInsert((SourceRecord)changeEvents.get(1), "id", 205);
        this.assertInsert((SourceRecord)changeEvents.get(2), "id", 206);
        Assertions.assertThat((int)changeEvents.size()).isEqualTo(3);
        this.stopConnector();
    }

    protected static class BinlogPosition {
        private String binlogFilename;
        private long binlogPosition;
        private String gtidSet;

        protected BinlogPosition() {
        }

        public void readFromDatabase(ResultSet rs) throws SQLException {
            if (rs.next()) {
                this.binlogFilename = rs.getString(1);
                this.binlogPosition = rs.getLong(2);
                if (rs.getMetaData().getColumnCount() > 4) {
                    this.gtidSet = rs.getString(5);
                }
            }
        }

        public String binlogFilename() {
            return this.binlogFilename;
        }

        public long binlogPosition() {
            return this.binlogPosition;
        }

        public String gtidSet() {
            return this.gtidSet;
        }

        public boolean hasGtids() {
            return this.gtidSet != null;
        }

        public String toString() {
            return "file=" + this.binlogFilename + ", pos=" + this.binlogPosition + ", gtids=" + (this.gtidSet != null ? this.gtidSet : "");
        }
    }
}

