/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.BinlogReader;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.data.KeyValueStore;
import io.debezium.data.SchemaChangeHistory;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class BinlogReaderIT {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-binlog.txt").toAbsolutePath();
    private static final String DB_NAME = "connector_test_ro";
    private static final String LOGICAL_NAME = "logical_server_name";
    private Configuration config;
    private MySqlTaskContext context;
    private BinlogReader reader;
    private KeyValueStore store;
    private SchemaChangeHistory schemaChanges;

    @Before
    public void beforeEach() {
        Testing.Files.delete((Path)DB_HISTORY_PATH);
        this.store = KeyValueStore.createForTopicsBeginningWith((String)"logical_server_name.");
        this.schemaChanges = new SchemaChangeHistory(LOGICAL_NAME);
    }

    @After
    public void afterEach() {
        if (this.reader != null) {
            try {
                this.reader.stop();
            }
            finally {
                if (this.context != null) {
                    try {
                        this.context.shutdown();
                    }
                    finally {
                        this.context = null;
                        Testing.Files.delete((Path)DB_HISTORY_PATH);
                    }
                }
            }
        }
    }

    protected int consumeAtLeast(int minNumber) throws InterruptedException {
        return this.consumeAtLeast(minNumber, 20L, TimeUnit.SECONDS);
    }

    protected int consumeAtLeast(int minNumber, long timeout, TimeUnit unit) throws InterruptedException {
        AtomicInteger counter = new AtomicInteger();
        List records = null;
        long startTime = System.currentTimeMillis();
        while (counter.get() < minNumber && System.currentTimeMillis() - startTime < unit.toMillis(timeout)) {
            records = this.reader.poll();
            if (records == null) continue;
            records.forEach(record -> {
                counter.incrementAndGet();
                VerifyRecord.isValid((SourceRecord)record);
                this.store.add(record);
                this.schemaChanges.add(record);
            });
            Testing.print((Object)("" + counter.get() + " records"));
        }
        return counter.get();
    }

    protected Configuration.Builder simpleConfig() {
        String hostname = System.getProperty("database.hostname");
        String port = System.getProperty("database.port");
        Assertions.assertThat((String)hostname).isNotNull();
        Assertions.assertThat((String)port).isNotNull();
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, hostname)).with(MySqlConnectorConfig.PORT, port)).with(MySqlConnectorConfig.USER, "replicator")).with(MySqlConnectorConfig.PASSWORD, "replpass")).with(MySqlConnectorConfig.SSL_MODE, MySqlConnectorConfig.SecureConnectionMode.DISABLED.name().toLowerCase())).with(MySqlConnectorConfig.SERVER_ID, 18911)).with(MySqlConnectorConfig.SERVER_NAME, LOGICAL_NAME)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_WHITELIST, DB_NAME)).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabase() throws Exception {
        this.config = this.simpleConfig().build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expected = 27;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(0);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(4);
        KeyValueStore.Collection products = this.store.collection(DB_NAME, "products");
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(DB_NAME, "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(DB_NAME, "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(DB_NAME, "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Exception {
        this.config = ((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expectedSchemaChangeCount = 6;
        int expected = 27 + expectedSchemaChangeCount;
        int consumed = this.consumeAtLeast(expected);
        Assertions.assertThat((int)consumed).isGreaterThanOrEqualTo(expected);
        Assertions.assertThat((int)this.schemaChanges.recordCount()).isEqualTo(expectedSchemaChangeCount);
        Assertions.assertThat((int)this.store.collectionCount()).isEqualTo(4);
        KeyValueStore.Collection products = this.store.collection(DB_NAME, "products");
        Assertions.assertThat((long)products.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection products_on_hand = this.store.collection(DB_NAME, "products_on_hand");
        Assertions.assertThat((long)products_on_hand.numberOfCreates()).isEqualTo(9L);
        Assertions.assertThat((long)products_on_hand.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)products_on_hand.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)products_on_hand.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection customers = this.store.collection(DB_NAME, "customers");
        Assertions.assertThat((long)customers.numberOfCreates()).isEqualTo(4L);
        Assertions.assertThat((long)customers.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)customers.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)customers.numberOfValueSchemaChanges()).isEqualTo(1L);
        KeyValueStore.Collection orders = this.store.collection(DB_NAME, "orders");
        Assertions.assertThat((long)orders.numberOfCreates()).isEqualTo(5L);
        Assertions.assertThat((long)orders.numberOfUpdates()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfDeletes()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfReads()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfTombstones()).isEqualTo(0L);
        Assertions.assertThat((long)orders.numberOfKeySchemaChanges()).isEqualTo(1L);
        Assertions.assertThat((long)orders.numberOfValueSchemaChanges()).isEqualTo(1L);
    }

    @Test
    @FixFor(value={"DBZ-183"})
    public void shouldHandleTimestampTimezones() throws Exception {
        String dbName = "regression_test";
        String tableName = "dbz_85_fractest";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.simpleConfig().with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.DATABASE_WHITELIST, dbName)).with(MySqlConnectorConfig.TABLE_WHITELIST, dbName + "." + tableName)).build();
        this.context = new MySqlTaskContext(this.config);
        this.context.start();
        this.context.source().setBinlogStartPoint("", 0L);
        this.context.initializeHistory();
        this.reader = new BinlogReader("binlog", this.context);
        this.reader.start();
        int expectedChanges = 1;
        this.consumeAtLeast(expectedChanges);
        List sourceRecords = this.store.sourceRecords();
        Assertions.assertThat((int)sourceRecords.size()).isEqualTo(1);
        ZonedDateTime expectedTimestamp = ZonedDateTime.of(LocalDateTime.parse("2014-09-08T17:51:04.780"), ZoneId.systemDefault());
        String expectedTimestampString = expectedTimestamp.format(ZonedTimestamp.FORMATTER);
        SourceRecord sourceRecord = (SourceRecord)sourceRecords.get(0);
        Struct value = (Struct)sourceRecord.value();
        Struct after = value.getStruct("after");
        String actualTimestampString = after.getString("c4");
        Assertions.assertThat((String)actualTimestampString).isEqualTo((Object)expectedTimestampString);
    }
}

