/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TransactionMetadataIT
extends AbstractConnectorTest {
    private static final String PRODUCT_INSERT_STMT = "INSERT INTO products (name, description, weight) VALUES ('robot', 'Toy robot', 1.304);";
    private static final String CUSTOMER_INSERT_STMT_1 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Nitin', 'Agarwal', 'test1@abc.com' ); ";
    private static final String CUSTOMER_INSERT_STMT_2 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Rajesh', 'Agarwal', 'test2@abc.com' ); ";
    private static final String ORDER_INSERT_STMT = "INSERT INTO orders (order_date, purchaser, quantity, product_id) VALUES ('2016-01-16', 1001, 1, 1); ";
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-tm.txt").toAbsolutePath();
    private static final String SERVER_NAME = "tm_test";
    private final UniqueDatabase DATABASE = new UniqueDatabase("tm_test", "transaction_metadata_test").withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    @Test
    public void transactionMetadataEnabled() throws InterruptedException, SQLException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        TransactionMetadataIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
            connection.commit();
        }
        String txId = null;
        ArrayList<SourceRecord> allRecords = new ArrayList<SourceRecord>();
        for (int i = 0; txId == null && i < 50; ++i) {
            List records = this.consumeRecordsByTopic(100).allRecordsInOrder();
            txId = this.getTxId(records);
            allRecords.addAll(records);
        }
        Assert.assertNotNull((String)"Failed to find the transaction", txId);
        int beginIndex = this.findFirstEvent(allRecords, txId);
        if (allRecords.size() < beginIndex + 6) {
            allRecords.addAll(this.consumeRecordsByTopic(6).allRecordsInOrder());
        }
        List transactionRecords = allRecords.subList(beginIndex, beginIndex + 1 + 4 + 1);
        Assert.assertFalse((boolean)transactionRecords.isEmpty());
        Assert.assertEquals((long)6L, (long)transactionRecords.size());
        String databaseName = this.DATABASE.getDatabaseName();
        String beginTxId = this.assertBeginTransaction((SourceRecord)transactionRecords.get(0));
        Assert.assertEquals((Object)txId, (Object)beginTxId);
        this.assertEndTransaction((SourceRecord)transactionRecords.get(5), txId, 4L, Collect.hashMapOf((Object)(databaseName + ".products"), (Object)1, (Object)(databaseName + ".customers"), (Object)2, (Object)(databaseName + ".orders"), (Object)1));
    }

    @Test
    @FixFor(value={"DBZ-4077"})
    public void shouldUseConfiguredTransactionTopicName() throws InterruptedException, SQLException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).with(MySqlConnectorConfig.TRANSACTION_TOPIC, "tx.of.${database.server.name}")).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        TransactionMetadataIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
            connection.commit();
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List txnEvents = records.recordsForTopic("tx.of." + this.DATABASE.getServerName());
        Assertions.assertThat((List)txnEvents).hasSize(2);
    }

    @Test
    @FixFor(value={"DBZ-4077"})
    public void shouldUseConfiguredTransactionTopicNameWithoutServerName() throws InterruptedException, SQLException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)).with(MySqlConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).with(MySqlConnectorConfig.TRANSACTION_TOPIC, "mytransactions")).build();
        this.start(MySqlConnector.class, this.config);
        this.assertConnectorIsRunning();
        TransactionMetadataIT.waitForSnapshotToBeCompleted((String)"mysql", (String)this.DATABASE.getServerName());
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.setAutoCommit(false);
            connection.execute(new String[]{CUSTOMER_INSERT_STMT_1, PRODUCT_INSERT_STMT, ORDER_INSERT_STMT, CUSTOMER_INSERT_STMT_2});
            connection.commit();
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List txnEvents = records.recordsForTopic("mytransactions");
        Assertions.assertThat((List)txnEvents).hasSize(2);
    }

    private String getTxId(List<SourceRecord> records) {
        Optional<Struct> product = records.stream().map(sr -> (Struct)sr.value()).filter(sr -> sr.schema().field("source") != null).filter(sr -> sr.getStruct("source").getString("table").equals("products")).filter(s -> s.getStruct("after").getString("description").equals("Toy robot")).findFirst();
        return product.map(struct -> (String)struct.getStruct("transaction").get("id")).orElse(null);
    }

    private int findFirstEvent(List<SourceRecord> records, String txId) {
        int i = 0;
        for (SourceRecord sr : records) {
            if (((Struct)sr.value()).getString("id").equals(txId)) {
                return i;
            }
            ++i;
        }
        return -1;
    }
}

