/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.relational.history;

import io.debezium.config.Configuration;
import io.debezium.kafka.KafkaCluster;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.ddl.DdlParserSql2003;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.DatabaseHistoryMetrics;
import io.debezium.relational.history.KafkaDatabaseHistory;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.util.Map;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KafkaDatabaseHistoryTest {
    private KafkaDatabaseHistory history;
    private KafkaCluster kafka;
    private Map<String, String> source;
    private Map<String, Object> position;
    private String topicName;
    private String ddl;
    private static final int PARTITION_NO = 0;

    @Before
    public void beforeEach() throws Exception {
        this.source = Collect.hashMapOf((Object)"server", (Object)"my-server");
        this.setLogPosition(0);
        this.topicName = "schema-changes-topic";
        File dataDir = Testing.Files.createTestingDirectory("history_cluster");
        Testing.Files.delete(dataDir);
        this.kafka = new KafkaCluster().usingDirectory(dataDir).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf((String)"auto.create.topics.enable", (String)"false", (String)"zookeeper.session.timeout.ms", (String)"20000")).startup();
        this.history = new KafkaDatabaseHistory();
    }

    @After
    public void afterEach() {
        try {
            if (this.history != null) {
                this.history.stop();
            }
        }
        finally {
            this.history = null;
            try {
                if (this.kafka != null) {
                    this.kafka.shutdown();
                }
            }
            finally {
                this.kafka = null;
            }
        }
    }

    @Test
    public void shouldStartWithEmptyTopicAndStoreDataAndRecoverAllState() throws Exception {
        this.kafka.createTopic(this.topicName, 1, 1);
        this.testHistoryTopicContent(false);
    }

    private void testHistoryTopicContent(boolean skipUnparseableDDL) {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, this.kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, this.topicName)).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"max.poll.interval.ms"), 100)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"session.timeout.ms"), 50000)).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, skipUnparseableDDL)).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test")).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        this.history.start();
        this.history.initializeStorage();
        this.history.initializeStorage();
        DdlParserSql2003 recoveryParser = new DdlParserSql2003();
        DdlParserSql2003 ddlParser = new DdlParserSql2003();
        ddlParser.setCurrentSchema("db1");
        Tables tables1 = new Tables();
        Tables tables2 = new Tables();
        Tables tables3 = new Tables();
        this.setLogPosition(0);
        this.history.recover(this.source, this.position, tables1, (DdlParser)recoveryParser);
        Assertions.assertThat((int)tables1.size()).isEqualTo(0);
        this.setLogPosition(10);
        this.ddl = "CREATE TABLE foo ( name VARCHAR(255) NOT NULL PRIMARY KEY); \nCREATE TABLE customers ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(100) NOT NULL ); \nCREATE TABLE products ( productId INTEGER NOT NULL PRIMARY KEY, desc VARCHAR(255) NOT NULL); \n";
        this.history.record(this.source, this.position, "db1", this.ddl);
        ddlParser.parse(this.ddl, tables1);
        Assertions.assertThat((int)tables1.size()).isEqualTo(3);
        ddlParser.parse(this.ddl, tables2);
        Assertions.assertThat((int)tables2.size()).isEqualTo(3);
        ddlParser.parse(this.ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(3);
        this.setLogPosition(39);
        this.ddl = "DROP TABLE foo;";
        this.history.record(this.source, this.position, "db1", this.ddl);
        ddlParser.parse(this.ddl, tables2);
        Assertions.assertThat((int)tables2.size()).isEqualTo(2);
        ddlParser.parse(this.ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(2);
        this.setLogPosition(10003);
        this.ddl = "CREATE TABLE suppliers ( supplierId INTEGER NOT NULL PRIMARY KEY, name VARCHAR(255) NOT NULL);";
        this.history.record(this.source, this.position, "db1", this.ddl);
        ddlParser.parse(this.ddl, tables3);
        Assertions.assertThat((int)tables3.size()).isEqualTo(3);
        this.history.stop();
        this.history = new KafkaDatabaseHistory();
        this.history.configure(config, null, DatabaseHistoryListener.NOOP, true);
        Tables recoveredTables = new Tables();
        this.setLogPosition(15);
        this.history.recover(this.source, this.position, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables1);
        recoveredTables = new Tables();
        this.setLogPosition(50);
        this.history.recover(this.source, this.position, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables2);
        recoveredTables = new Tables();
        this.setLogPosition(10010);
        this.history.recover(this.source, this.position, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables3);
        recoveredTables = new Tables();
        this.setLogPosition(100000010);
        this.history.recover(this.source, this.position, recoveredTables, (DdlParser)recoveryParser);
        Assertions.assertThat((Object)recoveredTables).isEqualTo((Object)tables3);
    }

    protected void setLogPosition(int index) {
        this.position = Collect.hashMapOf((Object)"filename", (Object)"my-txn-file.log", (Object)"position", (Object)index);
    }

    @Test
    public void shouldIgnoreUnparseableMessages() throws Exception {
        this.kafka.createTopic(this.topicName, 1, 1);
        ProducerRecord nullRecord = new ProducerRecord(this.topicName, Integer.valueOf(0), null, null);
        ProducerRecord emptyRecord = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"");
        ProducerRecord noSourceRecord = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"{\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord noPositionRecord = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord invalidJSONRecord1 = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"");
        ProducerRecord invalidJSONRecord2 = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"DROP TABLE foo;\"}");
        ProducerRecord invalidSQL = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        Configuration intruderConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", this.kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(intruderConfig.asProperties());){
            producer.send(nullRecord).get();
            producer.send(emptyRecord).get();
            producer.send(noSourceRecord).get();
            producer.send(noPositionRecord).get();
            producer.send(invalidJSONRecord1).get();
            producer.send(invalidJSONRecord2).get();
            producer.send(invalidSQL).get();
        }
        this.testHistoryTopicContent(true);
    }

    @Test(expected=ParsingException.class)
    public void shouldStopOnUnparseableSQL() throws Exception {
        this.kafka.createTopic(this.topicName, 1, 1);
        ProducerRecord invalidSQL = new ProducerRecord(this.topicName, Integer.valueOf(0), null, (Object)"{\"source\":{\"server\":\"my-server\"},\"position\":{\"filename\":\"my-txn-file.log\",\"position\":39},\"databaseName\":\"db1\",\"ddl\":\"xxxDROP TABLE foo;\"}");
        Configuration intruderConfig = ((Configuration.Builder)((Configuration.Builder)Configuration.create().withDefault("bootstrap.servers", this.kafka.brokerList()).withDefault("client.id", "intruder").withDefault("key.serializer", StringSerializer.class)).withDefault("value.serializer", StringSerializer.class)).build();
        try (KafkaProducer producer = new KafkaProducer(intruderConfig.asProperties());){
            producer.send(invalidSQL).get();
        }
        this.testHistoryTopicContent(false);
    }

    @Test
    public void testExists() {
        this.testHistoryTopicContent(true);
        Assert.assertTrue((boolean)this.history.exists());
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, this.kafka.brokerList())).with(KafkaDatabaseHistory.TOPIC, "dummytopic")).with(DatabaseHistory.NAME, "my-db-history")).with(KafkaDatabaseHistory.RECOVERY_POLL_INTERVAL_MS, 500)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"max.poll.interval.ms"), 100)).with(KafkaDatabaseHistory.consumerConfigPropertyName((String)"session.timeout.ms"), 50000)).with(KafkaDatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_CLASS, "org.apache.kafka.connect.source.SourceConnector")).with(KafkaDatabaseHistory.INTERNAL_CONNECTOR_ID, "dbz-test")).build();
        this.history.configure(config, null, DatabaseHistoryMetrics.NOOP, true);
        this.history.start();
        Assert.assertFalse((boolean)this.history.exists());
    }
}

