/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.binlog;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.binlog.AbstractBinlogConnectorIT;
import io.debezium.connector.binlog.BinlogConnectorConfig;
import io.debezium.connector.binlog.BinlogStreamingChangeEventSource;
import io.debezium.connector.binlog.junit.SkipTestDependingOnDatabaseRule;
import io.debezium.connector.binlog.junit.SkipWhenDatabaseIs;
import io.debezium.connector.binlog.junit.SkipWhenDatabaseIsMultiple;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.management.MBeanServer;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenDatabaseIs(value=SkipWhenDatabaseIs.Type.MYSQL, versions={@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")})
public abstract class BinlogStreamingSourceIT<C extends SourceConnector>
extends AbstractBinlogConnectorIT<C> {
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-schema-history-binlog.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    private static final String SET_TLS_PROTOCOLS = "database.enabledTLSProtocols";
    private Configuration config;
    private KeyValueStore store;
    private SchemaChangeHistory schemaChanges;
    @Rule
    public TestRule skipRule = new SkipTestDependingOnDatabaseRule();

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        this.store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        this.schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
    }

    protected int consumeAtLeast(int minNumber) throws InterruptedException {
        return this.consumeAtLeast(minNumber, 20L, TimeUnit.SECONDS);
    }

    protected int consumeAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(minNumber);
        int count = records.allRecordsInOrder().size();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            this.store.add(record);
            this.schemaChanges.add(record);
        });
        Testing.print((Object)(count + " records"));
        return count;
    }

    protected long filterAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        long targetNumber = minNumber;
        long startTime = System.currentTimeMillis();
        while (this.getNumberOfEventsFiltered() < targetNumber && System.currentTimeMillis() - startTime < unit.toMillis(timeout)) {
            this.consumeRecord();
        }
        return this.getNumberOfEventsFiltered();
    }

    private long getNumberOfEventsFiltered() {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            return (Long)mbeanServer.getAttribute(BinlogStreamingSourceIT.getStreamingMetricsObjectName((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming"), "NumberOfEventsFiltered");
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    private long getNumberOfSkippedEvents() {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        try {
            return (Long)mbeanServer.getAttribute(BinlogStreamingSourceIT.getStreamingMetricsObjectName((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming"), "NumberOfSkippedEvents");
        }
        catch (Exception e) {
            throw new DebeziumException((Throwable)e);
        }
    }

    protected Configuration.Builder simpleConfig() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(BinlogConnectorConfig.USER, "replicator")).with(BinlogConnectorConfig.PASSWORD, "replpass")).with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(BinlogConnectorConfig.INCLUDE_SQL_QUERY, false)).with(BinlogConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)BinlogConnectorConfig.SnapshotMode.NEVER);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.config = this.simpleConfig().build();
        this.start(this.getConnectorClass(), this.config);
        int expected = 28;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        this.store.sourceRecords().forEach(System.out::println);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(this.getConnectorClass(), this.config);
        int expectedSchemaChangeCount = 7;
        int expected = 28 + expectedSchemaChangeCount;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(expectedSchemaChangeCount);
        List<String> expectedAffectedTables = Arrays.asList(null, "Products", "Products", "products_on_hand", "customers", "orders", "dbz_342_timetest");
        ArrayList affectedTables = new ArrayList();
        this.schemaChanges.forEach(record -> {
            affectedTables.add(((Struct)record.value()).getStruct("source").getString("table"));
            Assertions.assertThat((Object)((Struct)record.value()).getStruct("source").get("db")).isEqualTo((Object)this.DATABASE.getDatabaseName());
        });
        Assertions.assertThat(affectedTables).isEqualTo(expectedAffectedTables);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    @FixFor(value={"DBZ-1206"})
    public void shouldFilterAllRecordsBasedOnDatabaseIncludeListFilter() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, "db-does-not-exist")).build();
        this.start(this.getConnectorClass(), this.config);
        BinlogStreamingSourceIT.waitForStreamingRunning((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming");
        int expectedFilterCount = 35;
        long numberFiltered = this.filterAtLeast(35, 20L, TimeUnit.SECONDS);
        Assertions.assertThat((long)numberFiltered).isGreaterThanOrEqualTo(35L);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(0);
        Assertions.assertThat((long)this.getNumberOfSkippedEvents()).isEqualTo(0L);
    }

    @Test
    @FixFor(value={"DBZ-183"})
    public void shouldHandleTimestampTimezones() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_85_fractest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, REGRESSION_DATABASE.getDatabaseName())).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.start(this.getConnectorClass(), this.config);
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        String dateTime = this.isMariaDb() ? "2014-09-08T17:51:04.77" : "2014-09-08T17:51:04.780";
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        ZonedDateTime expectedTimestamp = ZonedDateTime.of(LocalDateTime.parse(dateTime), UniqueDatabase.TIMEZONE).withZoneSameInstant(ZoneOffset.UTC);
        String expectedTimestampString = expectedTimestamp.format(ZonedTimestamp.FORMATTER);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        String actualTimestampString = after.getString("c4");
        Assertions.assertThat((String)actualTimestampString).isEqualTo((Object)expectedTimestampString);
    }

    @Test
    @FixFor(value={"DBZ-342"})
    public void shouldHandleMySQLTimeCorrectly() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_342_timetest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(BinlogConnectorConfig.DATABASE_INCLUDE_LIST, REGRESSION_DATABASE.getDatabaseName())).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.start(this.getConnectorClass(), this.config);
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        String durationValue = "PT517H51M4.78S";
        long timeWithNanoSeconds = 1864264780000000L;
        int nanos = 780;
        if (this.isMariaDb()) {
            durationValue = "PT517H51M4.77S";
            timeWithNanoSeconds = 1864264770000000L;
            nanos = 770;
        }
        long c1 = after.getInt64("c1");
        Duration c1Time = Duration.ofNanos(c1 * 1000L);
        Duration c1ExpectedTime = this.toDuration(durationValue);
        Assert.assertEquals((Object)c1ExpectedTime, (Object)c1Time);
        Assert.assertEquals((long)c1ExpectedTime.toNanos(), (long)c1Time.toNanos());
        Assertions.assertThat((long)c1Time.toNanos()).isEqualTo(timeWithNanoSeconds);
        Assertions.assertThat((Comparable)c1Time).isEqualTo((Object)Duration.ofHours(517L).plusMinutes(51L).plusSeconds(4L).plusMillis(nanos));
        long c2 = after.getInt64("c2");
        Duration c2Time = Duration.ofNanos(c2 * 1000L);
        Duration c2ExpectedTime = this.toDuration("-PT13H14M50S");
        Assert.assertEquals((Object)c2ExpectedTime, (Object)c2Time);
        Assert.assertEquals((long)c2ExpectedTime.toNanos(), (long)c2Time.toNanos());
        Assertions.assertThat((long)c2Time.toNanos()).isEqualTo(-47690000000000L);
        Assert.assertTrue((boolean)c2Time.isNegative());
        Assertions.assertThat((Comparable)c2Time).isEqualTo((Object)Duration.ofHours(-13L).minusMinutes(14L).minusSeconds(50L));
        long c3 = after.getInt64("c3");
        Duration c3Time = Duration.ofNanos(c3 * 1000L);
        Duration c3ExpectedTime = this.toDuration("-PT733H0M0.001S");
        Assert.assertEquals((Object)c3ExpectedTime, (Object)c3Time);
        Assert.assertEquals((long)c3ExpectedTime.toNanos(), (long)c3Time.toNanos());
        Assertions.assertThat((long)c3Time.toNanos()).isEqualTo(-2638800001000000L);
        Assert.assertTrue((boolean)c3Time.isNegative());
        Assertions.assertThat((Comparable)c3Time).isEqualTo((Object)Duration.ofHours(-733L).minusMillis(1L));
        long c4 = after.getInt64("c4");
        Duration c4Time = Duration.ofNanos(c4 * 1000L);
        Duration c4ExpectedTime = this.toDuration("-PT1H59M59.001S");
        Assert.assertEquals((Object)c4ExpectedTime, (Object)c4Time);
        Assert.assertEquals((long)c4ExpectedTime.toNanos(), (long)c4Time.toNanos());
        Assertions.assertThat((long)c4Time.toNanos()).isEqualTo(-7199001000000L);
        Assert.assertTrue((boolean)c4Time.isNegative());
        Assertions.assertThat((Comparable)c4Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(59L).minusSeconds(59L).minusMillis(1L));
        long c5 = after.getInt64("c5");
        Duration c5Time = Duration.ofNanos(c5 * 1000L);
        Duration c5ExpectedTime = this.toDuration("-PT838H59M58.999999S");
        Assert.assertEquals((Object)c5ExpectedTime, (Object)c5Time);
        Assert.assertEquals((long)c5ExpectedTime.toNanos(), (long)c5Time.toNanos());
        Assertions.assertThat((long)c5Time.toNanos()).isEqualTo(-3020398999999000L);
        Assert.assertTrue((boolean)c5Time.isNegative());
        Assertions.assertThat((Comparable)c5Time).isEqualTo((Object)Duration.ofHours(-838L).minusMinutes(59L).minusSeconds(58L).minusNanos(999999000L));
        long c6 = after.getInt64("c6");
        Duration c6Time = Duration.ofNanos(c6 * 1000L);
        Duration c6ExpectedTime = this.toDuration("-PT00H20M38.000000S");
        Assert.assertEquals((Object)c6ExpectedTime, (Object)c6Time);
        Assert.assertEquals((long)c6ExpectedTime.toNanos(), (long)c6Time.toNanos());
        Assertions.assertThat((long)c6Time.toNanos()).isEqualTo(-1238000000000L);
        Assert.assertTrue((boolean)c6Time.isNegative());
        Assertions.assertThat((Comparable)c6Time).isEqualTo((Object)Duration.ofHours(0L).negated().minusMinutes(20L).minusSeconds(38L).minusNanos(0L));
        long c7 = after.getInt64("c7");
        Duration c7Time = Duration.ofNanos(c7 * 1000L);
        Duration c7ExpectedTime = this.toDuration("-PT01H01M01.000001S");
        Assert.assertEquals((Object)c7ExpectedTime, (Object)c7Time);
        Assert.assertEquals((long)c7ExpectedTime.toNanos(), (long)c7Time.toNanos());
        Assertions.assertThat((long)c7Time.toNanos()).isEqualTo(-3661000001000L);
        Assert.assertTrue((boolean)c7Time.isNegative());
        Assertions.assertThat((Comparable)c7Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(1L).minusNanos(1000L));
        long c8 = after.getInt64("c8");
        Duration c8Time = Duration.ofNanos(c8 * 1000L);
        Duration c8ExpectedTime = this.toDuration("-PT01H01M01.000000S");
        Assert.assertEquals((Object)c8ExpectedTime, (Object)c8Time);
        Assert.assertEquals((long)c8ExpectedTime.toNanos(), (long)c8Time.toNanos());
        Assertions.assertThat((long)c8Time.toNanos()).isEqualTo(-3661000000000L);
        Assert.assertTrue((boolean)c8Time.isNegative());
        Assertions.assertThat((Comparable)c8Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(1L).minusNanos(0L));
        long c9 = after.getInt64("c9");
        Duration c9Time = Duration.ofNanos(c9 * 1000L);
        Duration c9ExpectedTime = this.toDuration("-PT01H01M00.000000S");
        Assert.assertEquals((Object)c9ExpectedTime, (Object)c9Time);
        Assert.assertEquals((long)c9ExpectedTime.toNanos(), (long)c9Time.toNanos());
        Assertions.assertThat((long)c9Time.toNanos()).isEqualTo(-3660000000000L);
        Assert.assertTrue((boolean)c9Time.isNegative());
        Assertions.assertThat((Comparable)c9Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(1L).minusSeconds(0L).minusNanos(0L));
        long c10 = after.getInt64("c10");
        Duration c10Time = Duration.ofNanos(c10 * 1000L);
        Duration c10ExpectedTime = this.toDuration("-PT01H00M00.000000S");
        Assert.assertEquals((Object)c10ExpectedTime, (Object)c10Time);
        Assert.assertEquals((long)c10ExpectedTime.toNanos(), (long)c10Time.toNanos());
        Assertions.assertThat((long)c10Time.toNanos()).isEqualTo(-3600000000000L);
        Assert.assertTrue((boolean)c10Time.isNegative());
        Assertions.assertThat((Comparable)c10Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(0L).minusSeconds(0L).minusNanos(0L));
        long c11 = after.getInt64("c11");
        Duration c11Time = Duration.ofNanos(c11 * 1000L);
        Duration c11ExpectedTime = this.toDuration("PT00H00M00.000000S");
        Assert.assertEquals((Object)c11ExpectedTime, (Object)c11Time);
        Assert.assertEquals((long)c11ExpectedTime.toNanos(), (long)c11Time.toNanos());
        Assertions.assertThat((long)c11Time.toNanos()).isEqualTo(0L);
        Assert.assertTrue((boolean)c11Time.isZero());
        Assertions.assertThat((Comparable)c11Time).isEqualTo((Object)Duration.ofHours(0L).minusMinutes(0L).minusSeconds(0L).minusNanos(0L));
    }

    @Test(expected=ConnectException.class)
    public void shouldFailOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(null);
    }

    @Test
    public void shouldWarnOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.WARN);
    }

    @Test
    public void shouldIgnoreOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode.SKIP);
    }

    @Test
    @SkipWhenDatabaseIsMultiple(value={@SkipWhenDatabaseIs(value=SkipWhenDatabaseIs.Type.MYSQL, versions={@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN_OR_EQUAL, major=5, minor=6, reason="MySQL 5.6 does not support SSL")}), @SkipWhenDatabaseIs(value=SkipWhenDatabaseIs.Type.MARIADB, reason="MariaDB does not support SSL by default")})
    @FixFor(value={"DBZ-1208"})
    public void shouldFailOnUnknownTlsProtocol() {
        UniqueDatabase REGRESSION_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        this.config = ((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.SSL_MODE, (EnumeratedValue)BinlogConnectorConfig.SecureConnectionMode.REQUIRED)).with(SET_TLS_PROTOCOLS, "TLSv1.7").build();
        HashMap result = new HashMap();
        this.start(this.getConnectorClass(), this.config, (success, message, error) -> {
            result.put("success", success);
            result.put("message", message);
        });
        Assert.assertEquals((Object)false, result.get("success"));
        Assert.assertEquals((Object)"Connector configuration is not valid. Unable to connect: Specified list of TLS versions only contains non valid TLS protocols. Accepted values are TLSv1.2 and TLSv1.3.", (Object)result.get("message").toString());
    }

    @Test
    @SkipWhenDatabaseIsMultiple(value={@SkipWhenDatabaseIs(value=SkipWhenDatabaseIs.Type.MYSQL, versions={@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN_OR_EQUAL, major=5, minor=6, reason="MySQL 5.6 does not support SSL")}), @SkipWhenDatabaseIs(value=SkipWhenDatabaseIs.Type.MARIADB, reason="MariaDB does not support SSL by default")})
    @FixFor(value={"DBZ-1208"})
    public void shouldAcceptTls12() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = TestHelper.getUniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        this.config = ((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.SSL_MODE, (EnumeratedValue)BinlogConnectorConfig.SecureConnectionMode.REQUIRED)).with(SET_TLS_PROTOCOLS, "TLSv1.2").build();
        AtomicReference exception = new AtomicReference();
        this.start(this.getConnectorClass(), this.config, (success, message, error) -> exception.set(error));
        BinlogStreamingSourceIT.waitForStreamingRunning((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming");
        Assertions.assertThat((Throwable)((Throwable)exception.get())).isNull();
    }

    @Test
    @FixFor(value={"DBZ-4029"})
    public void testHeartbeatActionQueryExecuted() throws Exception {
        String HEARTBEAT_TOPIC_PREFIX_VALUE = "myheartbeat";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(BinlogConnectorConfig.USER, "snapper")).with(BinlogConnectorConfig.PASSWORD, "snapperpass")).with("__debezium-heartbeat", "myheartbeat").with(Heartbeat.HEARTBEAT_INTERVAL, "100")).with("heartbeat.action.query", String.format("INSERT INTO %s.test_heartbeat_table (text) VALUES ('test_heartbeat');", this.DATABASE.getDatabaseName())).build();
        try (BinlogTestConnection connection = this.getTestDatabaseConnection(this.DATABASE.getDatabaseName());){
            connection.execute(new String[]{"CREATE TABLE test_heartbeat_table (text TEXT);"});
        }
        AtomicReference exception = new AtomicReference();
        this.start(this.getConnectorClass(), this.config, (success, message, error) -> exception.set(error));
        BinlogStreamingSourceIT.waitForStreamingRunning((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming");
        String slotQuery = String.format("SELECT COUNT(*) FROM %s.test_heartbeat_table;", this.DATABASE.getDatabaseName());
        JdbcConnection.ResultSetMapper slotQueryMapper = rs -> {
            rs.next();
            return rs.getInt(1);
        };
        Awaitility.await().alias("Awaiting heartbeat action query insert").pollInterval(100L, TimeUnit.MILLISECONDS).atMost((long)(BinlogStreamingSourceIT.waitTimeForRecords() * 30), TimeUnit.SECONDS).until(() -> {
            try (BinlogTestConnection connection = this.getTestDatabaseConnection(this.DATABASE.getDatabaseName());){
                int numOfHeartbeatActions = (Integer)connection.queryAndMap(slotQuery, slotQueryMapper);
                Boolean bl = numOfHeartbeatActions > 0;
                return bl;
            }
        });
    }

    private void inconsistentSchema(CommonConnectorConfig.EventProcessingFailureHandlingMode mode) throws InterruptedException, SQLException {
        LogInterceptor logInterceptor = new LogInterceptor(BinlogStreamingChangeEventSource.class);
        Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders"));
        this.config = mode == null ? builder.build() : ((Configuration.Builder)builder.with(BinlogConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE, (EnumeratedValue)mode)).build();
        this.start(this.getConnectorClass(), this.config);
        int expected = 5;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        this.stopConnector();
        this.config = ((Configuration.Builder)builder.with(BinlogConnectorConfig.TABLE_INCLUDE_LIST, this.DATABASE.qualifiedTableName("orders") + "," + this.DATABASE.qualifiedTableName("customers"))).build();
        AtomicReference exception = new AtomicReference();
        this.start(this.getConnectorClass(), this.config, (success, message, error) -> exception.set(error));
        try (BinlogTestConnection db = this.getTestDatabaseConnection(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            if (mode == null) {
                BinlogStreamingSourceIT.waitForStreamingRunning((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming");
            }
            statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
        }
        if (mode == null) {
            Awaitility.await().atMost(Duration.ofSeconds(BinlogStreamingSourceIT.waitTimeForRecords())).until(() -> logInterceptor.containsMessage("Error during binlog processing."));
            this.waitForEngineShutdown();
        } else {
            BinlogStreamingSourceIT.waitForStreamingRunning((String)this.getConnectorName(), (String)this.DATABASE.getServerName(), (String)"streaming");
        }
        this.stopConnector();
        Throwable e = (Throwable)exception.get();
        if (e != null) {
            throw (RuntimeException)e;
        }
    }

    private Duration toDuration(String duration) {
        return Duration.parse(duration);
    }

    private String productsTableName() throws SQLException {
        try (BinlogTestConnection db = this.getTestDatabaseConnection(this.DATABASE.getDatabaseName());){
            String string = db.isTableIdCaseSensitive() ? "products" : "Products";
            return string;
        }
    }
}

