/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.relational.history;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlReadOnlyIncrementalSnapshotContext;
import io.debezium.connector.mysql.SourceInfo;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.doc.FixFor;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.storage.kafka.history.KafkaDatabaseHistory;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class KafkaDatabaseHistoryTest {
    private static KafkaCluster kafka;
    private KafkaDatabaseHistory history;
    private Offsets<Partition, MySqlOffsetContext> offsets;
    private MySqlOffsetContext position;
    private static final int PARTITION_NO = 0;

    @BeforeClass
    public static void startKafka() throws Exception {
        File dataDir = Testing.Files.createTestingDirectory((String)"history_cluster");
        Testing.Files.delete((File)dataDir);
        kafka = new KafkaCluster().usingDirectory(dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf((String)"auto.create.topics.enable", (String)"false", (String)"zookeeper.session.timeout.ms", (String)"20000")).startup();
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @Before
    public void beforeEach() throws Exception {
        MySqlPartition source = new MySqlPartition("my-server", "my-db");
        Configuration config = ((Configuration.Builder)Configuration.empty().edit().with(RelationalDatabaseConnectorConfig.SERVER_NAME, "dbserver1")).build();
        this.position = new MySqlOffsetContext(false, true, new TransactionContext(), (IncrementalSnapshotContext)new MySqlReadOnlyIncrementalSnapshotContext(), new SourceInfo(new MySqlConnectorConfig(config)));
        this.offsets = Offsets.of((Partition)source, (OffsetContext)this.position);
        this.setLogPosition(0);
        this.history = new KafkaDatabaseHistory();
    }

    @After
    public void afterEach() {
        try {
            if (this.history != null) {
                this.history.stop();
            }
        }
        finally {
            this.history = null;
        }
    }

    @Test
    public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
        String topicName = "empty-and-recovery-schema-changes";
        kafka.createTopic(topicName, 1, 1);
        this.testHistoryTopicContent(topicName, false);
    }

    private void testHistoryTopicContent(String topicName, boolean skipUnparseableDDL) {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, topicName)).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"max.poll.interval.ms"), 100)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"session.timeout.ms"), 50000)).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test")).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        this.history.start();
        this.history.initializeStorage();
        this.history.initializeStorage();
        MySqlAntlrDdlParser recoveryParser = new MySqlAntlrDdlParser();
        MySqlAntlrDdlParser ddlParser = new MySqlAntlrDdlParser();
        ddlParser.setCurrentSchema("db1");
        Tables tables1 = new Tables();
        Tables tables2 = new Tables();
        Tables tables3 = new Tables();
        this.setLogPosition(0);
        this.history.recover(this.offsets, tables1, (DdlParser)recoveryParser);
        Assertions.assertThat((int)tables1.size()).isEqualTo(0);
        this.setLogPosition(10);
        String ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, description VARCHAR(255) NOT NULL); \n";
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), ((MySqlOffsetContext)this.offsets.getTheOnlyOffset()).getOffset(), "db1", ddl);
        ddlParser.parse(ddl, tables1);
        Assertions.assertThat((int)tables1.size()).isEqualTo(3);
        ddlParser.parse(ddl, tables2);
        Assertions.assertThat((int)tables2.size()).isEqualTo(3);
        ddlParser.parse(ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(3);
        this.setLogPosition(39);
        ddl = "DROP TABLE foo;";
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), ((MySqlOffsetContext)this.offsets.getTheOnlyOffset()).getOffset(), "db1", ddl);
        ddlParser.parse(ddl, tables2);
        Assertions.assertThat((int)tables2.size()).isEqualTo(2);
        ddlParser.parse(ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(2);
        this.setLogPosition(10003);
        ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);";
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), ((MySqlOffsetContext)this.offsets.getTheOnlyOffset()).getOffset(), "db1", ddl);
        ddlParser.parse(ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(3);
        this.history.stop();
        this.history = new KafkaDatabaseHistory();
        this.history.configure(config, null, DatabaseHistoryListener.NOOP, true);
        Tables recoveredTables = new Tables();
        this.setLogPosition(15);
        this.history.recover(this.offsets, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables1);
        recoveredTables = new Tables();
        this.setLogPosition(50);
        this.history.recover(this.offsets, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables2);
        recoveredTables = new Tables();
        this.setLogPosition(10010);
        this.history.recover(this.offsets, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables3);
        recoveredTables = new Tables();
        this.setLogPosition(100000010);
        this.history.recover(this.offsets, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables3);
    }

    protected void setLogPosition(int index) {
        this.position.setBinlogStartPoint("my-txn-file.log", (long)index);
    }

    @Test
    public void shouldIgnoreUnparseableMessages() throws Exception {
        String topicName = "ignore-unparseable-schema-changes";
        kafka.createTopic(topicName, 1, 1);
        ProducerRecord nullRecord = new ProducerRecord(topicName, Integer.valueOf(0), null, null);
        ProducerRecord emptyRecord = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"");
        ProducerRecord noSourceRecord = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord noPositionRecord = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord invalidJSONRecord1 = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"");
        ProducerRecord invalidJSONRecord2 = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord invalidSQL = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        ProducerRecord invalidSQLProcedure = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"CREATE DEFINER=`myUser`@`%` PROCEDURE `tableAFetchCount`(        in p_uniqueID int        )BEGINselect count(*) into @propCount from tableA  where uniqueID = p_uniqueID;    select count(*) into @completeCount from tableA  where uniqueID = p_uniqueID and isComplete = 1;       select  uniqueID,   @propCount as propCount, @completeCount as completeCount, @completeCount/ @propCount * 100 as completePct        where uniqueID = p_uniqueID;END\"}");
        Configuration intruderConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(intruderConfig.asProperties());){
            producer.send(nullRecord).get();
            producer.send(emptyRecord).get();
            producer.send(noSourceRecord).get();
            producer.send(noPositionRecord).get();
            producer.send(invalidJSONRecord1).get();
            producer.send(invalidJSONRecord2).get();
            producer.send(invalidSQL).get();
            producer.send(invalidSQLProcedure).get();
        }
        this.testHistoryTopicContent(topicName, true);
    }

    @Test(expected=ParsingException.class)
    public void shouldStopOnUnparseableSQL() throws Exception {
        String topicName = "stop-on-unparseable-schema-changes";
        kafka.createTopic(topicName, 1, 1);
        ProducerRecord invalidSQL = new ProducerRecord(topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        Configuration intruderConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(intruderConfig.asProperties());){
            producer.send(invalidSQL).get();
        }
        this.testHistoryTopicContent(topicName, false);
    }

    @Test
    public void testExists() {
        String topicName = "exists-schema-changes";
        this.testHistoryTopicContent(topicName, true);
        Assert.assertTrue((boolean)this.history.exists());
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, "dummytopic")).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"max.poll.interval.ms"), 100)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"session.timeout.ms"), 50000)).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test")).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        Assert.assertFalse((boolean)this.history.exists());
    }

    @Test
    @FixFor(value={"DBZ-1886"})
    public void differentiateStorageExistsFromHistoryExists() {
        String topicName = "differentiate-storage-exists-schema-changes";
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, topicName)).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"max.poll.interval.ms"), 100)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"session.timeout.ms"), 50000)).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        Assert.assertFalse((boolean)this.history.storageExists());
        this.history.initializeStorage();
        Assert.assertTrue((boolean)this.history.storageExists());
        Assert.assertFalse((boolean)this.history.exists());
        this.history.start();
        this.setLogPosition(0);
        String ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
        this.history.record(this.offsets.getTheOnlyPartition().getSourcePartition(), ((MySqlOffsetContext)this.offsets.getTheOnlyOffset()).getOffset(), "db1", ddl);
        Assert.assertTrue((boolean)this.history.exists());
        Assert.assertTrue((boolean)this.history.storageExists());
    }

    @Test
    @FixFor(value={"DBZ-2144"})
    public void shouldValidateMandatoryValues() {
        Configuration config = Configuration.create().build();
        Map issues = config.validate(KafkaDatabaseHistory.ALL_FIELDS);
        Assertions.assertThat(issues.keySet()).isEqualTo((Object)Collect.unmodifiableSet((Object[])new String[]{"database.history.name", "database.history.connector.class", "database.history.kafka.topic", "database.history.kafka.bootstrap.servers", "database.history.kafka.recovery.poll.interval.ms", "database.history.connector.id", "database.history.kafka.recovery.attempts", "database.history.kafka.query.timeout.ms"}));
    }

    @Test
    @FixFor(value={"DBZ-4518"})
    public void shouldConnectionTimeoutIfValueIsTooLow() {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, "this-should-not-get-created")).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.KAFKA_QUERY_TIMEOUT_MS, 1)).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        try {
            this.history.initializeStorage();
        }
        catch (Exception ex) {
            Assert.assertEquals(TimeoutException.class, ex.getCause().getClass());
        }
        Assert.assertTrue((boolean)this.history.storageExists());
    }
}

