/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mysql;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mysql.MySQLConnection;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.UniqueDatabase;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check=EqualityCheck.LESS_THAN, major=5, minor=6, reason="DDL uses fractional second data types, not supported until MySQL 5.6")
public class BinlogReaderBufferIT
extends AbstractConnectorTest {
    private static final Path DB_HISTORY_PATH = Testing.Files.createTestingPath((String)"file-db-history-connect.txt").toAbsolutePath();
    private final UniqueDatabase DATABASE = new UniqueDatabase("myServer1", "connector_test").withDbHistoryPath(DB_HISTORY_PATH);
    private final UniqueDatabase RO_DATABASE = new UniqueDatabase("myServer2", "connector_test_ro", this.DATABASE).withDbHistoryPath(DB_HISTORY_PATH);
    private Configuration config;

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.DATABASE.createAndInitialize();
        this.RO_DATABASE.createAndInitialize();
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)DB_HISTORY_PATH);
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            Testing.Files.delete((Path)DB_HISTORY_PATH);
        }
    }

    @Test
    public void shouldCorrectlyManageRollback() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 10000)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(39);
        if (replicaIsMaster) {
            try (MySQLConnection db = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                 JdbcConnection connection = db.connect();){
                Connection jdbc = connection.connection();
                connection.setAutoCommit(false);
                Statement statement = jdbc.createStatement();
                statement.executeUpdate("CREATE TEMPORARY TABLE tmp_ids (a int)");
                statement.executeUpdate("INSERT INTO tmp_ids VALUES(5)");
                jdbc.commit();
                statement.executeUpdate("DROP TEMPORARY TABLE tmp_ids");
                statement.executeUpdate("UPDATE products SET weight=100.12 WHERE id=109");
                jdbc.rollback();
                connection.query("SELECT * FROM products", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
                connection.setAutoCommit(true);
            }
            Thread.sleep(5000L);
            this.assertNoRecordsToConsume();
            this.assertEngineIsRunning();
            Testing.print((Object)"*** Done with rollback TX");
        }
    }

    @Test
    public void shouldProcessSavepoint() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(39);
        try (MySQLConnection db = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
             JdbcConnection connection = db.connect();){
            Connection jdbc = connection.connection();
            connection.setAutoCommit(false);
            Statement statement = jdbc.createStatement();
            statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
            jdbc.setSavepoint();
            statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
            jdbc.commit();
            connection.query("SELECT * FROM customers", rs -> {
                if (Testing.Print.isEnabled()) {
                    connection.print(rs);
                }
            });
            connection.setAutoCommit(true);
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((List)records.recordsForTopic(this.DATABASE.topicForTable("customers"))).hasSize(2);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(2);
        Testing.print((Object)"*** Done with savepoint TX");
    }

    @Test
    public void shouldProcessLargeTransaction() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER, 9)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(39);
        try (MySQLConnection db = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());){
            int numRecords = 40;
            JdbcConnection connection = db.connect();
            Object object = null;
            try {
                Connection jdbc = connection.connection();
                connection.setAutoCommit(false);
                Statement statement = jdbc.createStatement();
                for (int i = 0; i < 40; ++i) {
                    statement.executeUpdate(String.format("INSERT INTO customers\nVALUES (default,\"%s\",\"%s\",\"%s\")", i, i, i));
                }
                jdbc.commit();
                connection.query("SELECT * FROM customers", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
                connection.setAutoCommit(true);
            }
            catch (Throwable jdbc) {
                object = jdbc;
                throw jdbc;
            }
            finally {
                if (connection != null) {
                    if (object != null) {
                        try {
                            connection.close();
                        }
                        catch (Throwable jdbc) {
                            ((Throwable)object).addSuppressed(jdbc);
                        }
                    } else {
                        connection.close();
                    }
                }
            }
            records = this.consumeRecordsByTopic(40);
            int recordIndex = 0;
            for (SourceRecord r : records.allRecordsInOrder()) {
                Struct envelope = (Struct)r.value();
                Assertions.assertThat((String)envelope.getString("op")).isEqualTo((Object)"c");
                Assertions.assertThat((String)envelope.getStruct("after").getString("email")).isEqualTo((Object)Integer.toString(recordIndex++));
            }
            Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
            Testing.print((Object)"*** Done with large TX");
        }
    }

    @Test
    @FixFor(value={"DBZ-411"})
    public void shouldProcessRolledBackSavepoint() throws SQLException, InterruptedException {
        String replicaPort;
        String masterPort = System.getProperty("database.port", "3306");
        boolean replicaIsMaster = masterPort.equals(replicaPort = System.getProperty("database.replica.port", "3306"));
        if (!replicaIsMaster) {
            Thread.sleep(5000L);
        }
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)Configuration.create().with(MySqlConnectorConfig.HOSTNAME, System.getProperty("database.replica.hostname", "localhost"))).with(MySqlConnectorConfig.PORT, System.getProperty("database.replica.port", "3306"))).with(MySqlConnectorConfig.USER, "snapper")).with(MySqlConnectorConfig.PASSWORD, "snapperpass")).with(MySqlConnectorConfig.SERVER_ID, 18765)).with(MySqlConnectorConfig.SERVER_NAME, this.DATABASE.getServerName())).with(MySqlConnectorConfig.SSL_MODE, (EnumeratedValue)MySqlConnectorConfig.SecureConnectionMode.DISABLED)).with(MySqlConnectorConfig.POLL_INTERVAL_MS, 10)).with(MySqlConnectorConfig.DATABASE_INCLUDE_LIST, this.DATABASE.getDatabaseName())).with(MySqlConnectorConfig.DATABASE_HISTORY, FileDatabaseHistory.class)).with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(FileDatabaseHistory.FILE_PATH, (Object)DB_HISTORY_PATH)).build();
        this.start(MySqlConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(39);
        if (replicaIsMaster) {
            int topicCount;
            int customerEventsCount;
            int recordCount;
            try (MySQLConnection db = MySQLConnection.forTestDatabase(this.DATABASE.getDatabaseName());
                 JdbcConnection connection = db.connect();){
                Connection jdbc = connection.connection();
                connection.setAutoCommit(false);
                Statement statement = jdbc.createStatement();
                statement.executeUpdate("CREATE TEMPORARY TABLE tmp_ids (a int)");
                statement.executeUpdate("INSERT INTO tmp_ids VALUES(5)");
                jdbc.commit();
                statement.executeUpdate("DROP TEMPORARY TABLE tmp_ids");
                statement.executeUpdate("INSERT INTO customers VALUES(default, 'first', 'first', 'first')");
                Savepoint savepoint = jdbc.setSavepoint();
                statement.executeUpdate("INSERT INTO customers VALUES(default, 'second', 'second', 'second')");
                jdbc.rollback(savepoint);
                jdbc.commit();
                connection.query("SELECT * FROM customers", rs -> {
                    if (Testing.Print.isEnabled()) {
                        connection.print(rs);
                    }
                });
                connection.setAutoCommit(true);
            }
            if (MySQLConnection.isMySQL5()) {
                recordCount = 3;
                customerEventsCount = 2;
                topicCount = 2;
            } else {
                recordCount = 1;
                customerEventsCount = 1;
                topicCount = 1;
            }
            records = this.consumeRecordsByTopic(recordCount);
            Assertions.assertThat((int)records.topics().size()).isEqualTo(topicCount);
            Assertions.assertThat((List)records.recordsForTopic(this.DATABASE.topicForTable("customers"))).hasSize(customerEventsCount);
            Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(recordCount);
            Testing.print((Object)"*** Done with savepoint TX");
        }
    }
}

