/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.BinlogReader;
import io.debezium.connector.mysql.MySQLConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BinlogReaderIT {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-binlog.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("logical_server_name", "connector_test_ro").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;
    private MySqlTaskContext context;
    private BinlogReader reader;
    private KeyValueStore store;
    private SchemaChangeHistory schemaChanges;

    @Before
    public void beforeEach() {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.DATABASE.createAndInitialize();
        this.store = KeyValueStore.createForTopicsBeginningWith((String)(this.DATABASE.getServerName() + "."));
        this.schemaChanges = new SchemaChangeHistory(this.DATABASE.getServerName());
    }

    @After
    public void afterEach() {
        if (this.reader != null) {
            try {
                this.reader.stop();
            }
            finally {
                if (this.context != null) {
                    try {
                        this.context.shutdown();
                    }
                    finally {
                        this.context = null;
                        Testing.Files.delete((Path)DB_HISTORY_PATH);
                    }
                }
            }
        }
    }

    protected int consumeAtLeast(int minNumber) throws InterruptedException {
        return this.consumeAtLeast(minNumber, 20L, TimeUnit.SECONDS);
    }

    protected int consumeAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        AtomicInteger counter = new AtomicInteger();
        List records = null;
        long startTime = System.currentTimeMillis();
        while (counter.get() < minNumber && System.currentTimeMillis() - startTime < unit.toMillis(timeout)) {
            records = this.reader.poll();
            if (records == null) continue;
            records.forEach(record -> {
                counter.incrementAndGet();
                VerifyRecord.isValid((SourceRecord)record);
                this.store.add(record);
                this.schemaChanges.add(record);
            });
            Testing.print((Object)("" + counter.get() + " records"));
        }
        return counter.get();
    }

    protected Configuration.Builder simpleConfig() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.USER, "replicator")).with(MySqlConnectorConfig.PASSWORD, "replpass")).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.config = this.simpleConfig().build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expected = 27;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expectedSchemaChangeCount = 7;
        int expected = 28 + expectedSchemaChangeCount;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(expectedSchemaChangeCount);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(5);
        KeyValueStore.Collection products = this.store.collection(this.DATABASE.getDatabaseName(), this.productsTableName());
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(this.DATABASE.getDatabaseName(), "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(this.DATABASE.getDatabaseName(), "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(this.DATABASE.getDatabaseName(), "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    @FixFor(value={"DBZ-183"})
    public void shouldHandleTimestampTimezones() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_85_fractest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        ZonedDateTime expectedTimestamp = ZonedDateTime.ofInstant(LocalDateTime.parse("2014-09-08T17:51:04.780").atZone(ZoneId.of("UTC")).toInstant(), ZoneId.systemDefault());
        String expectedTimestampString = expectedTimestamp.format(ZonedTimestamp.FORMATTER);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        String actualTimestampString = after.getString("c4");
        Assertions.assertThat((String)actualTimestampString).isEqualTo((Object)expectedTimestampString);
    }

    @Test
    @FixFor(value={"DBZ-342"})
    public void shouldHandleMySQLTimeCorrectly() throws Exception {
        UniqueDatabase REGRESSION_DATABASE = new UniqueDatabase("logical_server_name", "regression_test").withDbHistoryPath(DB_HISTORY_PATH);
        REGRESSION_DATABASE.createAndInitialize();
        String tableName = "dbz_342_timetest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_WHITELIST, REGRESSION_DATABASE.getDatabaseName())).with(MySqlConnectorConfig.TABLE_WHITELIST, REGRESSION_DATABASE.qualifiedTableName(tableName))).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        long c1 = after.getInt64("c1");
        Duration c1Time = Duration.ofNanos(c1 * 1000L);
        Duration c1ExpectedTime = this.toDuration("PT517H51M4.78S");
        Assert.assertEquals((Object)c1ExpectedTime, (Object)c1Time);
        Assert.assertEquals((long)c1ExpectedTime.toNanos(), (long)c1Time.toNanos());
        Assertions.assertThat((long)c1Time.toNanos()).isEqualTo(1864264780000000L);
        Assertions.assertThat((Object)c1Time).isEqualTo((Object)Duration.ofHours(517L).plusMinutes(51L).plusSeconds(4L).plusMillis(780L));
        long c2 = after.getInt64("c2");
        Duration c2Time = Duration.ofNanos(c2 * 1000L);
        Duration c2ExpectedTime = this.toDuration("-PT13H14M50S");
        Assert.assertEquals((Object)c2ExpectedTime, (Object)c2Time);
        Assert.assertEquals((long)c2ExpectedTime.toNanos(), (long)c2Time.toNanos());
        Assertions.assertThat((long)c2Time.toNanos()).isEqualTo(-47690000000000L);
        Assert.assertTrue((boolean)c2Time.isNegative());
        Assertions.assertThat((Object)c2Time).isEqualTo((Object)Duration.ofHours(-13L).minusMinutes(14L).minusSeconds(50L));
        long c3 = after.getInt64("c3");
        Duration c3Time = Duration.ofNanos(c3 * 1000L);
        Duration c3ExpectedTime = this.toDuration("-PT733H0M0.001S");
        Assert.assertEquals((Object)c3ExpectedTime, (Object)c3Time);
        Assert.assertEquals((long)c3ExpectedTime.toNanos(), (long)c3Time.toNanos());
        Assertions.assertThat((long)c3Time.toNanos()).isEqualTo(-2638800001000000L);
        Assert.assertTrue((boolean)c3Time.isNegative());
        Assertions.assertThat((Object)c3Time).isEqualTo((Object)Duration.ofHours(-733L).minusMillis(1L));
        long c4 = after.getInt64("c4");
        Duration c4Time = Duration.ofNanos(c4 * 1000L);
        Duration c4ExpectedTime = this.toDuration("-PT1H59M59.001S");
        Assert.assertEquals((Object)c4ExpectedTime, (Object)c4Time);
        Assert.assertEquals((long)c4ExpectedTime.toNanos(), (long)c4Time.toNanos());
        Assertions.assertThat((long)c4Time.toNanos()).isEqualTo(-7199001000000L);
        Assert.assertTrue((boolean)c4Time.isNegative());
        Assertions.assertThat((Object)c4Time).isEqualTo((Object)Duration.ofHours(-1L).minusMinutes(59L).minusSeconds(59L).minusMillis(1L));
        long c5 = after.getInt64("c5");
        Duration c5Time = Duration.ofNanos(c5 * 1000L);
        Duration c5ExpectedTime = this.toDuration("-PT838H59M58.999999S");
        Assert.assertEquals((Object)c5ExpectedTime, (Object)c5Time);
        Assert.assertEquals((long)c5ExpectedTime.toNanos(), (long)c5Time.toNanos());
        Assertions.assertThat((long)c5Time.toNanos()).isEqualTo(-3020398999999000L);
        Assert.assertTrue((boolean)c5Time.isNegative());
        Assertions.assertThat((Object)c5Time).isEqualTo((Object)Duration.ofHours(-838L).minusMinutes(59L).minusSeconds(58L).minusNanos(999999000L));
    }

    @Test(expected=ConnectException.class)
    public void shouldFailOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(null);
        this.consumeAtLeast(2);
    }

    @Test
    public void shouldWarnOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(MySqlConnectorConfig.EventProcessingFailureHandlingMode.WARN);
        int consumed = this.consumeAtLeast(2, 2L, TimeUnit.SECONDS);
        Assertions.assertThat((int)consumed).isZero();
    }

    @Test
    public void shouldIgnoreOnSchemaInconsistency() throws Exception {
        this.inconsistentSchema(MySqlConnectorConfig.EventProcessingFailureHandlingMode.IGNORE);
        int consumed = this.consumeAtLeast(2, 2L, TimeUnit.SECONDS);
        Assertions.assertThat((int)consumed).isZero();
    }

    private void inconsistentSchema(MySqlConnectorConfig.EventProcessingFailureHandlingMode mode) throws InterruptedException, SQLException {
        this.config = mode == null ? this.simpleConfig().build() : ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE, (EnumeratedValue)mode)).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expected = 27;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        this.reader.stop();
        this.reader.start();
        this.reader.context.dbSchema().applyDdl(this.context.source(), this.DATABASE.getDatabaseName(), "DROP TABLE customers", null);
        try (MySQLConnection db = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();
             Connection jdbc = connection.connection();
             Statement statement = jdbc.createStatement();){
            statement.executeUpdate("INSERT INTO customers VALUES (default,'John','Lazy','john.lazy@acme.com')");
        }
    }

    private Duration toDuration(String duration) {
        return Duration.parse(duration);
    }

    private String productsTableName() {
        return this.context.isTableIdCaseInsensitive() ? "products" : "Products";
    }
}

