/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.Filters;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.SnapshotReader;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.fest.assertions.ObjectAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class SnapshotReaderIT {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-snapshot.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(DB_HISTORY_PATH);
    private final UniqueDatabase OTHER_DATABASE = new UniqueDatabase("logical_server_name", "connector_test", this.DATABASE);
    private Configuration config;
    private MySqlTaskContext context;
    private SnapshotReader reader;
    private CountDownLatch completed;
    @Rule
    public SkipTestRule skipRule = new SkipTestRule();
    private final Function<SourceRecord, String> getTableNameFromSourceRecord = sourceRecord -> ((Struct)sourceRecord.value()).getStruct("source").getString("table");

    @Before
    public void beforeEach() {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.DATABASE.createAndInitialize();
        this.OTHER_DATABASE.createAndInitialize();
        this.completed = new CountDownLatch(1);
    }

    @After
    public void afterEach() {
        if (this.reader != null) {
            try {
                this.reader.stop();
            }
            finally {
                this.reader = null;
            }
        }
        if (this.context != null) {
            try {
                this.context.shutdown();
            }
            finally {
                this.context = null;
                Testing.Files.delete((Path)DB_HISTORY_PATH);
            }
        }
    }

    protected Configuration.Builder simpleConfig() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.SNAPSHOT_LOCKING_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotLockingMode.MINIMAL)).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.snapshotOfSingleDatabase(true, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLock() throws Exception {
        this.snapshotOfSingleDatabase(false, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithoutGlobalLockAndStoreOnlyMonitoredTables() throws Exception {
        this.snapshotOfSingleDatabase(false, true);
    }

    private void snapshotOfSingleDatabase(boolean useGlobalLock, boolean storeOnlyMonitoredTables) throws Exception {
        Configuration.Builder builder = this.simpleConfig();
        if (!useGlobalLock) {
            ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)builder.with(MySqlConnectorConfig.USER, "cloud")).with(MySqlConnectorConfig.PASSWORD, "cloudpass")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(DatabaseHistory.STORE_ONLY_MONITORED_TABLES_DDL, storeOnlyMonitoredTables);
        }
        this.config = builder.build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context, useGlobalLock);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                store.add(record);
                schemaChanges.add(record);
            });
        }
        Assertions.assertThat((List)records).isNull();
        if (!useGlobalLock) {
            Assertions.assertThat((int)schemaChanges.recordCount()).isGreaterThan(0);
        } else {
            Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        }
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection timetest = store.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat((long)timetest.numberOfCreates()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList timerecords = new ArrayList();
        timetest.forEach(val -> timerecords.add(((Struct)val.value()).getStruct("after")));
        Struct after = (Struct)timerecords.get(0);
        Assertions.assertThat((Object)after.get("c1")).isEqualTo((Object)this.toMicroSeconds("PT517H51M04.78S"));
        Assertions.assertThat((Object)after.get("c2")).isEqualTo((Object)this.toMicroSeconds("-PT13H14M50S"));
        Assertions.assertThat((Object)after.get("c3")).isEqualTo((Object)this.toMicroSeconds("-PT733H0M0.001S"));
        Assertions.assertThat((Object)after.get("c4")).isEqualTo((Object)this.toMicroSeconds("-PT1H59M59.001S"));
        Assertions.assertThat((Object)after.get("c5")).isEqualTo((Object)this.toMicroSeconds("-PT838H59M58.999999S"));
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print((Object)"completed the snapshot");
        } else {
            Assert.fail((String)"failed to complete the snapshot within 10 seconds");
        }
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseUsingReadEvents() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "connector_(.*)_" + this.DATABASE.getIdentifier())).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateReadEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                store.add(record);
                schemaChanges.add(record);
            });
        }
        Assertions.assertThat((List)records).isNull();
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((Collection)store.databases()).containsOnly(new Object[]{this.DATABASE.getDatabaseName(), this.OTHER_DATABASE.getDatabaseName()});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(9);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection timetest = store.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat((long)timetest.numberOfCreates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfReads()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList timerecords = new ArrayList();
        timetest.forEach(val -> timerecords.add(((Struct)val.value()).getStruct("after")));
        Struct after = (Struct)timerecords.get(0);
        Assertions.assertThat((Object)after.get("c1")).isEqualTo((Object)this.toMicroSeconds("PT517H51M04.78S"));
        Assertions.assertThat((Object)after.get("c2")).isEqualTo((Object)this.toMicroSeconds("-PT13H14M50S"));
        Assertions.assertThat((Object)after.get("c3")).isEqualTo((Object)this.toMicroSeconds("-PT733H0M0.001S"));
        Assertions.assertThat((Object)after.get("c4")).isEqualTo((Object)this.toMicroSeconds("-PT1H59M59.001S"));
        Assertions.assertThat((Object)after.get("c5")).isEqualTo((Object)this.toMicroSeconds("-PT838H59M58.999999S"));
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print((Object)"completed the snapshot");
        } else {
            Assert.fail((String)"failed to complete the snapshot within 10 seconds");
        }
    }

    private String productsTableName() {
        return this.context.isTableIdCaseInsensitive() ? "products" : "Products";
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                store.add(record);
                schemaChanges.add(record);
            });
        }
        Assertions.assertThat((List)records).isNull();
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(14);
        Assertions.assertThat((int)schemaChanges.databaseCount()).isEqualTo(2);
        Assertions.assertThat((Collection)schemaChanges.databases()).containsOnly(new Object[]{this.DATABASE.getDatabaseName(), ""});
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection timetest = store.collection(this.DATABASE.getDatabaseName(), "dbz_342_timetest");
        Assertions.assertThat((long)timetest.numberOfCreates()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)timetest.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)timetest.numberOfValueSchemaChanges()).isEqualTo(1L);
        ArrayList timerecords = new ArrayList();
        timetest.forEach(val -> timerecords.add(((Struct)val.value()).getStruct("after")));
        Struct after = (Struct)timerecords.get(0);
        Assertions.assertThat((Object)after.get("c1")).isEqualTo((Object)this.toMicroSeconds("PT517H51M04.78S"));
        Assertions.assertThat((Object)after.get("c2")).isEqualTo((Object)this.toMicroSeconds("-PT13H14M50S"));
        Assertions.assertThat((Object)after.get("c3")).isEqualTo((Object)this.toMicroSeconds("-PT733H0M0.001S"));
        Assertions.assertThat((Object)after.get("c4")).isEqualTo((Object)this.toMicroSeconds("-PT1H59M59.001S"));
        Assertions.assertThat((Object)after.get("c5")).isEqualTo((Object)this.toMicroSeconds("-PT838H59M58.999999S"));
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print((Object)"completed the snapshot");
        } else {
            Assert.fail((String)"failed to complete the snapshot within 10 seconds");
        }
    }

    @Test(expected=ConnectException.class)
    public void shouldCreateSnapshotSchemaOnlyRecovery_exception() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                store.add(record);
                schemaChanges.add(record);
            });
        }
    }

    @Test
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.context.source().setBinlogStartPoint("binlog1", 555L);
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                store.add(record);
                schemaChanges.add(record);
            });
        }
        Assertions.assertThat((List)records).isNull();
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(0);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print((Object)"completed the snapshot");
        } else {
            Assert.fail((String)"failed to complete the snapshot within 10 seconds");
        }
    }

    @Test
    public void shouldSnapshotTablesInOrderSpecifiedInTableIncludeList() throws Exception {
        List records;
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                if (record.value() != null) {
                    tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
                }
            });
        }
        Assert.assertArrayEquals((Object[])tablesInOrder.toArray(), (Object[])tablesInOrderExpected.toArray());
    }

    @Test
    public void shouldSnapshotTablesInOrderSpecifiedInTablesWhitelist() throws Exception {
        List records;
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.TABLE_WHITELIST, "connector_test_ro_(.*).orders,connector_test_ro_(.*).Products,connector_test_ro_(.*).products_on_hand,connector_test_ro_(.*).dbz_342_timetest")).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("orders", "Products", "products_on_hand", "dbz_342_timetest");
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                if (record.value() != null) {
                    tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
                }
            });
        }
        Assert.assertArrayEquals((Object[])tablesInOrder.toArray(), (Object[])tablesInOrderExpected.toArray());
    }

    @Test
    public void shouldSnapshotTablesInLexicographicalOrder() throws Exception {
        List records;
        this.config = this.simpleConfig().build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        LinkedHashSet tablesInOrder = new LinkedHashSet();
        LinkedHashSet<String> tablesInOrderExpected = this.getTableNamesInSpecifiedOrder("Products", "customers", "dbz_342_timetest", "orders", "products_on_hand");
        while ((records = this.reader.poll()) != null) {
            records.forEach(record -> {
                VerifyRecord.isValid((SourceRecord)record);
                VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                if (record.value() != null) {
                    tablesInOrder.add(this.getTableNameFromSourceRecord.apply((SourceRecord)record));
                }
            });
        }
        Assert.assertArrayEquals((Object[])tablesInOrder.toArray(), (Object[])tablesInOrderExpected.toArray());
    }

    private LinkedHashSet<String> getTableNamesInSpecifiedOrder(String ... tables) {
        return new LinkedHashSet<String>(Arrays.asList(tables));
    }

    @Test
    public void shouldCreateSnapshotSchemaOnly() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(Heartbeat.HEARTBEAT_INTERVAL, 300000)).build();
        this.context = new MySqlTaskContext(this.config, new Filters.Builder(this.config).build());
        this.context.start();
        this.reader = new SnapshotReader("snapshot", this.context);
        this.reader.uponCompletion(this.completed::countDown);
        this.reader.generateInsertEvents();
        this.reader.start();
        List records = null;
        KeyValueStore store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        SchemaChangeHistory schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
        SourceRecord heartbeatRecord = null;
        while ((records = this.reader.poll()) != null) {
            ((ObjectAssert)Assertions.assertThat(heartbeatRecord).describedAs("Heartbeat record must be the last one")).isNull();
            if (heartbeatRecord == null && records.size() > 0 && ((SourceRecord)records.get(records.size() - 1)).topic().startsWith("__debezium-heartbeat")) {
                heartbeatRecord = (SourceRecord)records.get(records.size() - 1);
            }
            records.forEach(record -> {
                if (!record.topic().startsWith("__debezium-heartbeat")) {
                    Assertions.assertThat(record.sourceOffset().get("snapshot")).isEqualTo((Object)true);
                    VerifyRecord.isValid((SourceRecord)record);
                    VerifyRecord.hasNoSourceQuery((SourceRecord)record);
                    store.add(record);
                    schemaChanges.add(record);
                }
            });
        }
        Assertions.assertThat((List)records).isNull();
        Assertions.assertThat((int)schemaChanges.recordCount()).isEqualTo(14);
        Assertions.assertThat((int)store.collectionCount()).isEqualTo(0);
        Assertions.assertThat(heartbeatRecord).isNotNull();
        Assertions.assertThat(heartbeatRecord.sourceOffset().get("snapshot")).isNotEqualTo((Object)true);
        if (this.completed.await(10L, TimeUnit.SECONDS)) {
            Testing.print((Object)"completed the snapshot");
        } else {
            Assert.fail((String)"failed to complete the snapshot within 10 seconds");
        }
    }

    private long toMicroSeconds(String duration) {
        return Duration.parse(duration).toNanos() / 1000L;
    }
}

