/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlTestConnection;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=8, minor=0, patch=20, reason="MySQL 8.0.20 started supporting binlog compression")
public class TransactionPayloadIT
extends AbstractConnectorTest {
    private static final UUID PRODUCT_CODE = UUID.randomUUID();
    private static final String PRODUCT_NAME = "robot";
    private static final float PRODUCT_WEIGHT = 1.304f;
    private static final String PRODUCT_INSERT_STMT_1 = "INSERT INTO products (name, description, weight, code) VALUES ('robot', 'Toy robot', 1.304, uuid_to_bin('" + PRODUCT_CODE + "'));";
    private static final String CUSTOMER_INSERT_STMT_1 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Nitin', 'Agarwal', 'test1@abc.com' ); ";
    private static final String CUSTOMER_INSERT_STMT_2 = "INSERT INTO customers (first_name, last_name, email) VALUES ('Rajesh', 'Agarwal', 'test2@abc.com' ); ";
    private static final String ORDER_INSERT_STMT_1 = "INSERT INTO orders (order_date, purchaser, quantity, product_id) VALUES ('2016-01-16', 1001, 1, 1); ";
    private static final String CUSTOMER_UPDATE_STMT_1 = "UPDATE customers set first_name = 'Nitin1' where id = 1001; ";
    private static final String CUSTOMER_DELETE_STMT_1 = "DELETE from customers where id = 1001; ";
    private static final String ORDER_UPDATE_STMT_1 = "UPDATE orders set order_date = '2017-01-16' where order_number = 10001; ";
    private static final String ORDER_DELETE_STMT_1 = "DELETE from orders where order_number = 10001; ";
    private static final Path SCHEMA_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-schema-history-tp.txt").toAbsolutePath();
    private static final String SERVER_NAME = "transactionpayload_it";
    private final UniqueDatabase DATABASE = new UniqueDatabase("transactionpayload_it", "transactionpayload_test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
    @Rule
    public SkipTestRule skipTest = new SkipTestRule();
    private Configuration config;

    @Before
    public void beforeEach() throws TimeoutException, IOException, SQLException, InterruptedException {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)SCHEMA_HISTORY_PATH);
        }
    }

    @Test
    public void shouldCaptureMultipleWriteEvents() throws Exception {
        this.config = ((Configuration.Builder)this.DATABASE.defaultConfig().with(MySqlConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MySqlConnectorConfig.SnapshotMode.NEVER)).build();
        this.start(MySqlConnector.class, this.config);
        Testing.Debug.enable();
        this.assertConnectorIsRunning();
        int numCreateDatabase = 1;
        int numCreateTables = 3;
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(numCreateDatabase + numCreateTables);
        Assertions.assertThat((Object)records).isNotNull();
        records.forEach(x$0 -> this.validate((SourceRecord)x$0));
        try (MySqlTestConnection db = MySqlTestConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            connection.execute(new String[]{"set binlog_transaction_compression=ON;"});
            connection.execute(new String[]{CUSTOMER_INSERT_STMT_1, CUSTOMER_INSERT_STMT_2, PRODUCT_INSERT_STMT_1, ORDER_INSERT_STMT_1, CUSTOMER_UPDATE_STMT_1, ORDER_UPDATE_STMT_1, ORDER_DELETE_STMT_1, CUSTOMER_DELETE_STMT_1});
        }
        AbstractConnectorTest.SourceRecords dmlRecords = this.consumeRecordsByTopic(10);
        List customerDmls = dmlRecords.recordsForTopic(this.DATABASE.topicForTable("customers"));
        List productDmls = dmlRecords.recordsForTopic(this.DATABASE.topicForTable("products"));
        List orderDmls = dmlRecords.recordsForTopic(this.DATABASE.topicForTable("orders"));
        Assertions.assertThat((List)customerDmls).hasSize(5);
        Assertions.assertThat((List)productDmls).hasSize(1);
        Struct product = ((Struct)((SourceRecord)productDmls.get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)product.get("id")).isInstanceOf(Integer.class);
        Assertions.assertThat((Object)product.get("name")).isEqualTo((Object)PRODUCT_NAME);
        Assertions.assertThat((Object)product.get("weight")).isEqualTo((Object)Float.valueOf(1.304f));
        Assertions.assertThat((byte[])((ByteBuffer)product.get("code")).array()).isEqualTo((Object)this.uuidToByteArray(PRODUCT_CODE));
        Assertions.assertThat((List)orderDmls).hasSize(4);
    }

    private byte[] uuidToByteArray(UUID uuid) {
        ByteBuffer buffer = ByteBuffer.wrap(new byte[16]);
        buffer.putLong(uuid.getMostSignificantBits());
        buffer.putLong(uuid.getLeastSignificantBits());
        return buffer.array();
    }
}

