/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlSnapshotChangeEventSource;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class SnapshotSourceIT
extends AbstractConnectorTest {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-schema-history-snapshot.txt").toAbsolutePath();
    protected final UniqueDatabase DATABASE = new UniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    protected final UniqueDatabase OTHER_DATABASE = new UniqueDatabase("logical_server_name", "connector_test", this.DATABASE);
    protected final UniqueDatabase BINARY_FIELD_DATABASE = new UniqueDatabase("logical_server_name", "connector_read_binary_field_test");
    protected final UniqueDatabase CONFLICT_NAMES_DATABASE = new UniqueDatabase("logical_server_name", "mysql_dbz_6533");
    protected Configuration config;
    @Rule
    public SkipTestRule skipRule = new SkipTestRule();
    private final Function<SourceRecord, String> getTableNameFromSourceRecord = sourceRecord -> ((Struct)sourceRecord.value()).getStruct("source").getString("table");

    @Before
    public void beforeEach() {
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        this.DATABASE.createAndInitialize();
        this.OTHER_DATABASE.createAndInitialize();
        this.BINARY_FIELD_DATABASE.createAndInitialize();
        this.CONFLICT_NAMES_DATABASE.createAndInitialize();
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
    }

    protected Configuration.Builder simpleConfig() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotLockingMode.MINIMAL)).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.snapshotOfSingleDatabase(true, false, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Exception {
        this.snapshotOfSingleDatabase(false, false, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTables() throws Exception {
        this.snapshotOfSingleDatabase(false, true, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseNoData() throws Exception {
        this.snapshotOfSingleDatabase(true, false, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockNoData() throws Exception {
        this.snapshotOfSingleDatabase(false, false, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyCapturedTablesNoData() throws Exception {
        this.snapshotOfSingleDatabase(false, true, false);
    }

    private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyCapturedTables, boolean data) throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(MySqlSnapshotChangeEventSource.class);
        Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("customers") + "," + this.DATABASE.qualifiedTableName("products"))).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
        if (!useGlobalLock) {
            ((Configuration.Builder)((Configuration.Builder)builder.with(MySqlConnectorConfig.USER, "cloud")).with(MySqlConnectorConfig.PASSWORD, "cloudpass")).with("test.disable.global.locking", "true").with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedTables);
        }
        if (!data) {
            builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY);
        }
        this.config = builder.build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        int schemaEventsCount = storeOnlyCapturedTables ? 8 : 14;
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopicUntil((recordsConsumed, record) -> !record.sourceOffset().containsKey("snapshot"));
        String previousRecordTable = null;
        String previousSnapshotSourceField = null;
        Iterator i = sourceRecords.allRecordsInOrder().iterator();
        while (i.hasNext()) {
            SourceRecord record2 = (SourceRecord)i.next();
            VerifyRecord.isValid((SourceRecord)record2);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record2);
            store.add(record2);
            schemaChanges.add(record2);
            String snapshotSourceField = ((Struct)record2.value()).getStruct("source").getString("snapshot");
            String currentRecordTable = ((Struct)record2.value()).getStruct("source").getString("table");
            if (i.hasNext()) {
                Object snapshotOffsetField = record2.sourceOffset().get("snapshot");
                Assertions.assertThat(snapshotOffsetField).isEqualTo((Object)true);
                if (Objects.equals(snapshotSourceField, "first")) {
                    Assertions.assertThat(previousRecordTable).isNull();
                } else if (Objects.equals(snapshotSourceField, "first_in_data_collection")) {
                    Assertions.assertThat((String)previousRecordTable).isNotEqualTo((Object)currentRecordTable);
                } else if (Objects.equals(previousSnapshotSourceField, "last_in_data_collection")) {
                    Assertions.assertThat((String)previousRecordTable).isNotEqualTo((Object)currentRecordTable);
                }
            } else {
                Assertions.assertThat(record2.sourceOffset().get("snapshot")).isNull();
                Assertions.assertThat((String)snapshotSourceField).isEqualTo((Object)"last");
            }
            if (record2.topic().equals(this.DATABASE.getServerName())) continue;
            previousRecordTable = currentRecordTable;
            previousSnapshotSourceField = snapshotSourceField;
        }
        if (storeOnlyCapturedTables) {
            Assertions.assertThat((int)(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size() + schemaChanges.ddlRecordsForDatabaseOrEmpty(this.DATABASE.getDatabaseName()).size())).isEqualTo(schemaEventsCount);
            Assertions.assertThat((int)(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size() + schemaChanges.ddlRecordsForDatabaseOrEmpty(this.OTHER_DATABASE.getDatabaseName()).size())).isEqualTo(1);
        } else {
            Assertions.assertThat((int)(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size() + schemaChanges.ddlRecordsForDatabaseOrEmpty(this.DATABASE.getDatabaseName()).size())).isEqualTo(schemaEventsCount);
            Assertions.assertThat((int)(schemaChanges.ddlRecordsForDatabaseOrEmpty("").size() + schemaChanges.ddlRecordsForDatabaseOrEmpty(this.OTHER_DATABASE.getDatabaseName()).size())).isEqualTo(useGlobalLock ? 1 : 5);
        }
        if (!useGlobalLock) {
            logInterceptor.containsMessage("Table level locking is in place, the schema will be capture in two phases, now capturing:");
        } else {
            logInterceptor.containsMessage("Releasing global read lock to enable MySQL writes");
        }
        if (!data) {
            return;
        }
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(2);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList customerRecrods = new ArrayList();
        customers.forEach(val -> customerRecrods.add(((Struct)val.value()).getStruct("after")));
        Struct customer = (Struct)customerRecrods.stream().sorted((a, b) -> a.getInt32("id").compareTo(b.getInt32("id"))).findFirst().get();
        Assertions.assertThat((Object)customer.get("first_name")).isInstanceOf(String.class);
        Assertions.assertThat((Object)customer.get("last_name")).isInstanceOf(String.class);
        Assertions.assertThat((Object)customer.get("email")).isInstanceOf(String.class);
        Assertions.assertThat((Object)customer.get("first_name")).isEqualTo((Object)"Sally");
        Assertions.assertThat((Object)customer.get("last_name")).isEqualTo((Object)"Thomas");
        Assertions.assertThat((Object)customer.get("email")).isEqualTo((Object)"sally.thomas@acme.com");
    }

    @Test
    public void snapshotWithBackupLocksShouldNotWaitForReads() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.USER, "cloud")).with(MySqlConnectorConfig.PASSWORD, "cloudpass")).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotLockingMode.MINIMAL_PERCONA)).build();
        if (!MySqlTestConnection.isPerconaServer()) {
            return;
        }
        MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
        final JdbcConnection connection = db.connect();
        final CountDownLatch latch = new CountDownLatch(1);
        Thread t = new Thread(){

            @Override
            public void run() {
                try {
                    connection.executeWithoutCommitting(new String[]{"SELECT *, SLEEP(20) FROM products_on_hand WHERE product_id=101"});
                    latch.countDown();
                }
                catch (Exception exception) {
                    // empty catch block
                }
            }
        };
        t.start();
        latch.await(10L, TimeUnit.SECONDS);
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        int recordCount = 28;
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(28);
        Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(28);
        connection.connection().close();
    }

    @Test
    @FixFor(value={"DBZ-2456"})
    public void shouldCreateSnapshotSelectively() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + this.DATABASE.getIdentifier())).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "connector_(.*).CUSTOMERS")).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(8);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
        });
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((Iterable)store.databases()).containsOnly((Object[])new String[]{this.DATABASE.getDatabaseName(), this.OTHER_DATABASE.getDatabaseName()});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(2);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((Object)orders).isNull();
    }

    @Test
    @FixFor(value={"DBZ-3952"})
    public void shouldNotFailStreamingOnNonSnapshottedTable() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("ORDERS") + "," + this.DATABASE.qualifiedTableName("CUSTOMERS"))).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, this.DATABASE.qualifiedTableName("ORDERS"))).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        Testing.Print.enable();
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(5);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
        });
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((Object)customers).isNull();
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(5L);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
        }
        AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
        Assertions.assertThat((Iterable)streamingRecords.topics()).hasSize(1);
        SourceRecord insert = (SourceRecord)streamingRecords.recordsForTopic(this.DATABASE.topicForTable("customers")).get(0);
        Assertions.assertThat((String)((Struct)insert.value()).getStruct("after").getString("email")).isEqualTo((Object)"john.lazy@acme.com");
    }

    @Test
    @FixFor(value={"DBZ-3238"})
    public void shouldSnapshotCorrectlyReadFields() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_read_binary_field_test_" + this.BINARY_FIELD_DATABASE.getIdentifier())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.BINARY_FIELD_DATABASE.qualifiedTableName("binary_field"))).with(MySqlConnectorConfig.ROW_COUNT_FOR_STREAMING_RESULT_SETS, "0")).with(MySqlConnectorConfig.SNAPSHOT_FETCH_SIZE, "101")).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.BINARY_FIELD_DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.BINARY_FIELD_DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.BINARY_FIELD_DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
        });
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((Iterable)store.databases()).contains((Object[])new String[]{this.BINARY_FIELD_DATABASE.getDatabaseName()});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(1);
        KeyValueStore.Collection customers = store.collection(this.BINARY_FIELD_DATABASE.getDatabaseName(), "binary_field");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseUsingInsertEvents() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + this.DATABASE.getIdentifier())).with("transforms", "snapshotasinsert").with("transforms.snapshotasinsert.type", "io.debezium.connector.mysql.transforms.ReadToInsertEvent").build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(55);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
        });
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((Iterable)store.databases()).containsOnly((Object[])new String[]{this.DATABASE.getDatabaseName(), this.OTHER_DATABASE.getDatabaseName()});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(9);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection timetest = store.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat((long)timetest.numberOfCreates()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList timerecords = new ArrayList();
        timetest.forEach(val -> timerecords.add(((Struct)val.value()).getStruct("after")));
        Struct after = (Struct)timerecords.get(0);
        String expected = MySqlTestConnection.isMariaDb() ? "PT517H51M04.77S" : "PT517H51M04.78S";
        Assertions.assertThat((Object)after.get("c1")).isEqualTo((Object)this.toMicroSeconds(expected));
        Assertions.assertThat((Object)after.get("c2")).isEqualTo((Object)this.toMicroSeconds("-PT13H14M50S"));
        Assertions.assertThat((Object)after.get("c3")).isEqualTo((Object)this.toMicroSeconds("-PT733H0M0.001S"));
        Assertions.assertThat((Object)after.get("c4")).isEqualTo((Object)this.toMicroSeconds("-PT1H59M59.001S"));
        Assertions.assertThat((Object)after.get("c5")).isEqualTo((Object)this.toMicroSeconds("-PT838H59M58.999999S"));
    }

    private String productsTableName() throws SQLException {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            String string = db.isTableIdCaseSensitive() ? "products" : "Products";
            return string;
        }
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(42);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
        });
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(14);
        Assertions.assertThat((int)schemaChanges.databaseCount()).isEqualTo(2);
        Assertions.assertThat((Iterable)schemaChanges.databases()).containsOnly((Object[])new String[]{this.DATABASE.getDatabaseName(), ""});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection timetest = store.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat((long)timetest.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfReads()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList timerecords = new ArrayList();
        timetest.forEach(val -> timerecords.add(((Struct)val.value()).getStruct("after")));
        Struct after = (Struct)timerecords.get(0);
        String expected = MySqlTestConnection.isMariaDb() ? "PT517H51M04.77S" : "PT517H51M04.78S";
        Assertions.assertThat((Object)after.get("c1")).isEqualTo((Object)this.toMicroSeconds(expected));
        Assertions.assertThat((Object)after.get("c2")).isEqualTo((Object)this.toMicroSeconds("-PT13H14M50S"));
        Assertions.assertThat((Object)after.get("c3")).isEqualTo((Object)this.toMicroSeconds("-PT733H0M0.001S"));
        Assertions.assertThat((Object)after.get("c4")).isEqualTo((Object)this.toMicroSeconds("-PT1H59M59.001S"));
        Assertions.assertThat((Object)after.get("c5")).isEqualTo((Object)this.toMicroSeconds("-PT838H59M58.999999S"));
    }

    @Test(expected=DebeziumException.class)
    public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        SnapshotSourceIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        throw (RuntimeException)exception.get();
    }

    @Test
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.INITIAL)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("customers"))).with(MySqlConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName());
        this.config = builder.build();
        this.start(MySqlConnector.class, this.config);
        int recordCount = 4;
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(recordCount);
        Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(recordCount);
        this.stopConnector();
        builder.with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
        this.config = builder.build();
        this.start(MySqlConnector.class, this.config);
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
        }
        recordCount = 1;
        sourceRecords = this.consumeRecordsByTopic(recordCount);
        Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(recordCount);
    }

    @Test
    public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeList() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(24);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            if (record.value() != null) {
                tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
            }
        });
        Assert.assertArrayEquals((Object[])tablesInOrder.toArray(), (Object[])tablesInOrderExpected.toArray());
    }

    @Test
    @FixFor(value={"DBZ-6533"})
    public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeListWithConflictingNames() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.CONFLICT_NAMES_DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.CONFLICT_NAMES_DATABASE.qualifiedTableName("tablename") + "," + this.CONFLICT_NAMES_DATABASE.qualifiedTableName("another") + "," + this.CONFLICT_NAMES_DATABASE.qualifiedTableName("tablename_suffix"))).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("tablename", "another", "tablename_suffix");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(3);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            if (record.value() != null) {
                tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
            }
        });
        Assert.assertArrayEquals((Object[])tablesInOrderExpected.toArray(), (Object[])tablesInOrder.toArray());
    }

    @Test
    public void shouldSnapshotTablesInRowCountOrderAsc() throws Exception {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            statement.execute("ANALYZE TABLE Products");
            statement.execute("ANALYZE TABLE dbz_342_timetest");
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).Products,connector_test_ro_(.*).dbz_342_timetest")).with(MySqlConnectorConfig.SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, (EnumeratedValue)RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.ASCENDING)).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("dbz_342_timetest", "Products");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            if (record.value() != null) {
                tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
            }
        });
        Assert.assertArrayEquals((Object[])tablesInOrderExpected.toArray(), (Object[])tablesInOrder.toArray());
    }

    @Test
    public void shouldSnapshotTablesInRowCountOrderDesc() throws Exception {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            statement.execute("ANALYZE TABLE Products");
            statement.execute("ANALYZE TABLE dbz_342_timetest");
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).dbz_342_timetest,connector_test_ro_(.*).Products")).with(MySqlConnectorConfig.SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, (EnumeratedValue)RelationalDatabaseConnectorConfig.SnapshotTablesRowCountOrder.DESCENDING)).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("Products", "dbz_342_timetest");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            if (record.value() != null) {
                tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
            }
        });
        Assert.assertArrayEquals((Object[])tablesInOrderExpected.toArray(), (Object[])tablesInOrder.toArray());
    }

    @Test
    public void shouldSnapshotTablesInLexicographicalOrder() throws Exception {
        this.config = this.simpleConfig().build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("Products", "customers", "dbz_342_timetest", "orders", "products_on_hand");
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(28);
        sourceRecords.allRecordsInOrder().forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            if (record.value() != null) {
                tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
            }
        });
        Assert.assertArrayEquals((Object[])tablesInOrder.toArray(), (Object[])tablesInOrderExpected.toArray());
    }

    private LinkedHashSet<String> getTableNamesInSpecifiedOrder(String ... tables) {
        return new LinkedHashSet<String>(Arrays.asList(tables));
    }

    @Test
    public void shouldCreateSnapshotSchemaOnly() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(Heartbeat.HEARTBEAT_INTERVAL, 300000)).build();
        this.start(MySqlConnector.class, this.config);
        SnapshotSourceIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(15);
        List allRecords = sourceRecords.allRecordsInOrder();
        Iterator i = allRecords.subList(0, allRecords.size() - 1).iterator();
        while (i.hasNext()) {
            SourceRecord record = (SourceRecord)i.next();
            VerifyRecord.isValid((SourceRecord)record);
            VerifyRecord.hasNoSourceQuery((SourceRecord)record);
            store.add(record);
            schemaChanges.add(record);
            if (record.topic().startsWith("__debezium-heartbeat")) continue;
            String snapshotSourceField = ((Struct)record.value()).getStruct("source").getString("snapshot");
            if (i.hasNext()) {
                Object snapshotOffsetField = record.sourceOffset().get("snapshot");
                Assertions.assertThat(snapshotOffsetField).isEqualTo((Object)true);
                Assertions.assertThat((String)snapshotSourceField).isEqualTo((Object)"true");
                continue;
            }
            Assertions.assertThat(record.sourceOffset().get("snapshot")).isNull();
            Assertions.assertThat((String)snapshotSourceField).isEqualTo((Object)"last");
        }
        SourceRecord heartbeatRecord = (SourceRecord)allRecords.get(allRecords.size() - 1);
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(14);
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(0);
        Assertions.assertThat((String)heartbeatRecord.topic()).startsWith((CharSequence)"__debezium-heartbeat");
        Assertions.assertThat((Object)heartbeatRecord).isNotNull();
        Assertions.assertThat(heartbeatRecord.sourceOffset().get("snapshot")).isNull();
    }

    private long toMicroSeconds(String duration) {
        return Duration.parse(duration).toNanos() / 1000L;
    }
}

