/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MBeanServer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class StreamingSourceIT
extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-binlog.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(DB_HISTORY_PATH);
    private static final String SET_TLS_PROTOCOLS = "database.enabledTLSProtocols";
    private Configuration config;
    private KeyValueStore store;
    private SchemaChangeHistory schemaChanges;
    @Rule
    public SkipTestRule skipRule = new SkipTestRule();

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        this.schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    protected int consumeAtLeast(int minNumber) throws InterruptedException {
        return this.consumeAtLeast(minNumber, 20L, TimeUnit.SECONDS);
    }

    protected int consumeAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(minNumber);
        int count = records.allRecordsInOrder().size();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            this.store.add(record);
            this.schemaChanges.add(record);
        });
        Testing.print((Object)(count + " records"));
        return count;
    }

    protected long filterAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        long targetNumber = minNumber;
        long startTime = System.currentTimeMillis();
        while (this.getNumberOfEventsFiltered() < targetNumber && System.currentTimeMillis() - startTime < unit.toMillis(timeout)) {
            this.consumeRecord();
        }
        return this.getNumberOfEventsFiltered();
    }

    private long getNumberOfEventsFiltered() {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            return (Long)mbeanServer.getAttribute(StreamingSourceIT.getStreamingMetricsObjectName((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming"), "NumberOfEventsFiltered");
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    private long getNumberOfSkippedEvents() {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            return (Long)mbeanServer.getAttribute(StreamingSourceIT.getStreamingMetricsObjectName((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming"), "NumberOfSkippedEvents");
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    protected Configuration.Builder simpleConfig() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.USER, "replicator")).with(MySqlConnectorConfig.PASSWORD, "replpass")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, false)).with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.config = this.simpleConfig().build();
        this.start(MySqlConnector.class, this.config);
        int expected = 28;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        this.store.sourceRecords().forEach(System.out::println);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(MySqlConnector.class, this.config);
        int expectedSchemaChangeCount = 7;
        int expected = 28 + expectedSchemaChangeCount;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(expectedSchemaChangeCount);
        List<String> expectedAffectedTables = Arrays.asList(null, "Products", "Products", "products_on_hand", "customers", "orders", "dbz_342_timetest");
        ArrayList affectedTables = new ArrayList();
        this.schemaChanges.forEach(record -> {
            affectedTables.add(((Struct)record.value()).getStruct("source").getString("table"));
            Assertions.assertThat((Object)((Struct)record.value()).getStruct("source").get("db")).isEqualTo((Object)this.DATABASE.getDatabaseName());
        });
        Assertions.assertThat(affectedTables).isEqualTo(expectedAffectedTables);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    @FixFor(value={"DBZ-1206"})
    public void shouldFilterAllRecordsBasedOnDatabaseIncludeListFilter() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, "db-does-not-exist")).build();
        this.start(MySqlConnector.class, this.config);
        StreamingSourceIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming");
        int expectedFilterCount = 35;
        long numberFiltered = this.filterAtLeast(35, 20L, TimeUnit.SECONDS);
        Assertions.assertThat((long)numberFiltered).isGreaterThanOrEqualTo(35L);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(0);
        Assertions.assertThat((long)this.getNumberOfSkippedEvents()).isEqualTo(0L);
    }

    @Test
    @FixFor(value={"DBZ-183"})
    public void shouldHandleTimestampTimezones() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_85_fractest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, REGRESSION_DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.start(MySqlConnector.class, this.config);
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        ZonedDateTime expectedTimestamp = ZonedDateTime.of(LocalDateTime.parse("2014-09-08T17:51:04.780"), UniqueDatabase.TIMEZONE).withZoneSameInstant(ZoneOffset.UTC);
        String expectedTimestampString = expectedTimestamp.format(ZonedTimestamp.FORMATTER);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        String actualTimestampString = after.getString("c4");
        Assertions.assertThat((String)actualTimestampString).isEqualTo((Object)expectedTimestampString);
    }

    @Test
    @FixFor(value={"DBZ-342"})
    public void shouldHandleMySQLTimeCorrectly() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_342_timetest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, REGRESSION_DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.start(MySqlConnector.class, this.config);
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        long c1 = after.getInt64("c1");
        Duration c1Time = Duration.ofNanos(c1 * 1000L);
        Duration c1ExpectedTime = this.toDuration("PT517H51M4.78S");
        Assert.assertEquals((Object)c1ExpectedTime, (Object)c1Time);
        Assert.assertEquals((long)c1ExpectedTime.toNanos(), (long)c1Time.toNanos());
        Assertions.assertThat((long)c1Time.toNanos()).isEqualTo(1864264780000000L);
        Assertions.assertThat((Object)c1Time).isEqualTo((Object)Duration.ofHours(517L).plusMinutes(51L).plusSeconds(4L).plusMillis(780L));
        long c2 = after.getInt64("c2");
        Duration c2Time = Duration.ofNanos(c2 * 1000L);
        Duration c2ExpectedTime = this.toDuration("-PT13H14M50S");
        Assert.assertEquals((Object)c2ExpectedTime, (Object)c2Time);
        Assert.assertEquals((long)c2ExpectedTime.toNanos(), (long)c2Time.toNanos());
        Assertions.assertThat((long)c2Time.toNanos()).isEqualTo(-47690000000000L);
        Assert.assertTrue((boolean)c2Time.isNegative());
        Assertions.assertThat((Object)c2Time).isEqualTo((Object)Duration.ofHours(-13L).minusMinutes(14L).minusSeconds(50L));
        long c3 = after.getInt64("c3");
        Duration c3Time = Duration.ofNanos(c3 * 1000L);
        Duration c3ExpectedTime = this.toDuration("-PT733H0M0.001S");
        Assert.assertEquals((Object)c3ExpectedTime, (Object)c3Time);
        Assert.assertEquals((long)c3ExpectedTime.toNanos(), (long)c3Time.toNanos());
        Assertions.assertThat((long)c3Time.toNanos()).isEqualTo(-2638800001000000L);
        Assert.assertTrue((boolean)c3Time.isNegative());
        Assertions.assertThat((Object)c3Time).isEqualTo((Object)Duration.ofHours(-733L).minusMillis(1L));
        long c4 = after.getInt64("c4");
        Duration c4Time = Duration.ofNanos(c4 * 1000L);
        Duration c4ExpectedTime = this.toDuration("-PT1H59M59.001S");
        Assert.assertEquals((Object)c4ExpectedTime, (Object)c4Time);
        Assert.assertEquals((long)c4ExpectedTime.toNanos(), (long)c4Time.toNanos());
        Assertions.assertThat((long)c4Time.toNanos()).isEqualTo(-7199001000000L);
        Assert.assertTrue((boolean)c4Time.isNegative());
        Assertions.assertThat((Object)c4Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(59L).minusSeconds(59L).minusMillis(1L));
        long c5 = after.getInt64("c5");
        Duration c5Time = Duration.ofNanos(c5 * 1000L);
        Duration c5ExpectedTime = this.toDuration("-PT838H59M58.999999S");
        Assert.assertEquals((Object)c5ExpectedTime, (Object)c5Time);
        Assert.assertEquals((long)c5ExpectedTime.toNanos(), (long)c5Time.toNanos());
        Assertions.assertThat((long)c5Time.toNanos()).isEqualTo(-3020398999999000L);
        Assert.assertTrue((boolean)c5Time.isNegative());
        Assertions.assertThat((Object)c5Time).isEqualTo((Object)Duration.ofHours(-838L).minusMinutes(59L).minusSeconds(58L).minusNanos(999999000L));
    }

    @Test(expected=ConnectException.class)
    public void shouldFailOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(null);
    }

    @Test
    public void shouldWarnOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN);
    }

    @Test
    public void shouldIgnoreOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.SKIP);
    }

    @Test(expected=DebeziumException.class)
    @SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN_OR_EQUAL, major=5, minor=6, reason="MySQL 5.6 does not support SSL")
    @FixFor(value={"DBZ-1208"})
    public void shouldFailOnUnknownTlsProtocol() {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.REQUIRED)).with(SET_TLS_PROTOCOLS, "TLSv1.7").build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        throw (RuntimeException)exception.get();
    }

    @Test
    @SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN_OR_EQUAL, major=5, minor=6, reason="MySQL 5.6 does not support SSL")
    @FixFor(value={"DBZ-1208"})
    public void shouldAcceptTls12() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.REQUIRED)).with(SET_TLS_PROTOCOLS, "TLSv1.2").build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        StreamingSourceIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming");
        Assertions.assertThat((Throwable)((Throwable)exception.get())).isNull();
    }

    @Test
    @FixFor(value={"DBZ-4029"})
    public void testHeartbeatActionQueryExecuted() throws Exception {
        String HEARTBEAT_TOPIC_PREFIX_VALUE = "myheartbeat";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with("__debezium-heartbeat", "myheartbeat").with(Heartbeat.HEARTBEAT_INTERVAL, "100")).with("heartbeat.action.query", String.format("INSERT INTO %s.test_heartbeat_table (text) VALUES ('test_heartbeat');", this.DATABASE.getDatabaseName())).build();
        try (MySqlTestConnection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            connection.execute(new String[]{"CREATE TABLE test_heartbeat_table (text TEXT);"});
        }
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        StreamingSourceIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming");
        String slotQuery = String.format("SELECT COUNT(*) FROM %s.test_heartbeat_table;", this.DATABASE.getDatabaseName());
        JdbcConnection.ResultSetMapper slotQueryMapper = rs -> {
            rs.next();
            return rs.getInt(1);
        };
        Awaitility.await().alias("Awaiting heartbeat action query insert").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)(StreamingSourceIT.waitTimeForRecords() * 30), TimeUnit.SECONDS).until(() -> {
            try (MySqlTestConnection connection = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
                int numOfHeartbeatActions = (Integer)connection.queryAndMap(slotQuery, slotQueryMapper);
                Boolean bl = numOfHeartbeatActions > 0;
                return bl;
            }
        });
    }

    private void inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode mode) throws InterruptedException, SQLException {
        LogInterceptor logInterceptor = new LogInterceptor(MySqlStreamingChangeEventSource.class);
        Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(DatabaseHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"));
        this.config = mode == null ? builder.build() : ((Configuration.Builder)builder.with(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE, (EnumeratedValue)mode)).build();
        this.start(MySqlConnector.class, this.config);
        int expected = 5;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        this.stopConnector();
        this.config = ((Configuration.Builder)builder.with(MySqlConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders") + "," + this.DATABASE.qualifiedTableName("customers"))).build();
        AtomicReference exception = new AtomicReference();
        this.start(MySqlConnector.class, this.config, (success, message, error) -> exception.set(error));
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            if (mode == null) {
                StreamingSourceIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming");
            }
            statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
        }
        if (mode == null) {
            Awaitility.await().atMost(Duration.ofSeconds(StreamingSourceIT.waitTimeForRecords())).until(() -> logInterceptor.containsMessage("Error during binlog processing."));
            StreamingSourceIT.waitForConnectorShutdown((String)"mysql", (String)this.DATABASE.getServerName());
        } else {
            StreamingSourceIT.waitForStreamingRunning((String)"mysql", (String)this.DATABASE.getServerName(), (String)"streaming");
        }
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e != null) {
            throw (RuntimeException)e;
        }
    }

    private Duration toDuration(String duration) {
        return Duration.parse(duration);
    }

    private String productsTableName() throws SQLException {
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            String string = db.isTableIdCaseSensitive() ? "products" : "Products";
            return string;
        }
    }
}

