/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerChangeTable;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

@SkipWhenKafkaVersion(check=EqualityCheck.EQUAL, value=SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description="Not compatible with Kafka 1.x")
public class TransactionMetadataIT
extends AbstractConnectorTest {
    private SqlServerConnection connection;
    @Rule
    public SkipTestRule skipRule = new SkipTestRule();

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void transactionMetadata() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        String[] inserts = new String[10];
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            inserts[2 * i] = "INSERT INTO tablea VALUES(" + id + ", 'a')";
            inserts[2 * i + 1] = "INSERT INTO tableb VALUES(" + id + ", 'b')";
        }
        this.connection.execute(inserts);
        this.connection.setAutoCommit(true);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1000, 'b')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(14);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        List tx = records.recordsForTopic("server1.transaction");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(6);
        Assertions.assertThat((List)tx).hasSize(3);
        List all = records.allRecordsInOrder();
        String txId = this.assertBeginTransaction((SourceRecord)all.get(0));
        long counter = 1L;
        for (int i = 1; i <= 10; ++i) {
            this.assertRecordTransactionMetadata((SourceRecord)all.get(i), txId, counter, (counter + 1L) / 2L);
            ++counter;
        }
        this.assertEndTransaction((SourceRecord)all.get(11), txId, 10L, Collect.hashMapOf((Object)"testDB.dbo.tablea", (Object)5, (Object)"testDB.dbo.tableb", (Object)5));
        this.stopConnector();
    }

    private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 30;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 1000;
        int HALF_ID = 25;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).build();
        if (restartJustAfterSnapshot) {
            this.start(SqlServerConnector.class, config);
            this.assertConnectorIsRunning();
            this.consumeRecordsByTopic(1);
            this.stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
            Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
                if (!this.connection.getMaxLsn("testdb").isAvailable()) {
                    return false;
                }
                for (SqlServerChangeTable ct : this.connection.listOfChangeTables("testdb")) {
                    String tableName = ct.getChangeTableId().table();
                    if (!tableName.endsWith("dbo_" + this.connection.getNameOfChangeTable("tablea"))) continue;
                    try {
                        Lsn minLsn = this.connection.getMinLsn("testdb", tableName);
                        Lsn maxLsn = this.connection.getMaxLsn("testdb");
                        AtomicReference<Boolean> found = new AtomicReference<Boolean>(false);
                        SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[0]);
                        this.connection.getChangesForTables("testdb", tables, minLsn, maxLsn, resultsets -> {
                            ResultSet rs = resultsets[0];
                            while (rs.next()) {
                                if (rs.getInt("id") != -1) continue;
                                found.set(true);
                                break;
                            }
                        });
                        return found.get();
                    }
                    catch (Exception e) {
                        Assert.fail((String)("Failed to fetch changes for tablea: " + e.getMessage()));
                    }
                }
                return false;
            });
        }
        this.start(SqlServerConnector.class, config, record -> {
            if (!"server1.dbo.tablea.Envelope".equals(record.valueSchema().name())) {
                return false;
            }
            Struct envelope = (Struct)record.value();
            Struct after = envelope.getStruct("after");
            Integer id = after.getInt32("id");
            String value = after.getString("cola");
            return id != null && id == 25 && "a".equals(value);
        });
        this.assertConnectorIsRunning();
        String firstTxId = null;
        if (restartJustAfterSnapshot) {
            SourceRecord begin = (SourceRecord)this.consumeRecordsByTopic(1).allRecordsInOrder().get(0);
            firstTxId = this.assertBeginTransaction(begin);
        }
        this.consumeRecordsByTopic(1);
        if (afterStreaming) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            List<SchemaAndValueField> expectedRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)-2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"-a"));
            this.assertRecord(((Struct)((SourceRecord)records.allRecordsInOrder().get(1)).value()).getStruct("after"), expectedRow);
            SourceRecord begin = (SourceRecord)records.allRecordsInOrder().get(0);
            firstTxId = this.assertBeginTransaction(begin);
        }
        this.connection.setAutoCommit(false);
        for (int i2 = 0; i2 < 30; ++i2) {
            int id2 = 10 + i2;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        this.connection.connection().commit();
        int txBeginIndex = firstTxId != null ? 1 : 0;
        int expectedRecords = txBeginIndex + 1 + 30;
        List records = this.consumeRecordsByTopic(expectedRecords).allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(expectedRecords);
        if (firstTxId != null) {
            this.assertEndTransaction((SourceRecord)records.get(0), firstTxId, 1L, Collect.hashMapOf((Object)"testDB.dbo.tablea", (Object)1));
        }
        String batchTxId = this.assertBeginTransaction((SourceRecord)records.get(txBeginIndex));
        SourceRecord lastRecordForOffset = (SourceRecord)records.get(30 + txBeginIndex);
        Struct value = (Struct)lastRecordForOffset.value();
        List<SchemaAndValueField> expectedLastRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)24), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        this.assertRecord((Struct)value.get("after"), expectedLastRow);
        this.assertRecordTransactionMetadata(lastRecordForOffset, batchTxId, 30L, 15L);
        this.stopConnector();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(30);
        records = sourceRecords.allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(30);
        List tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
        List tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
        for (i = 0; i < 15; ++i) {
            id = 25 + i;
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
            this.assertRecordTransactionMetadata(recordA, batchTxId, 30 + 2 * i + 1, 15 + i + 1);
            this.assertRecordTransactionMetadata(recordB, batchTxId, 30 + 2 * i + 2, 15 + i + 1);
        }
        for (i = 0; i < 30; ++i) {
            id = 1000 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
            this.connection.connection().commit();
        }
        sourceRecords = this.consumeRecordsByTopic(120);
        tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
        tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
        List txMetadata = sourceRecords.recordsForTopic("server1.transaction");
        Assertions.assertThat((List)tableA).hasSize(30);
        Assertions.assertThat((List)tableB).hasSize(30);
        Assertions.assertThat((List)txMetadata).hasSize(60);
        this.assertEndTransaction((SourceRecord)txMetadata.get(0), batchTxId, 60L, Collect.hashMapOf((Object)"testDB.dbo.tablea", (Object)30, (Object)"testDB.dbo.tableb", (Object)30));
        for (int i3 = 0; i3 < 30; ++i3) {
            int id3 = i3 + 1000;
            SourceRecord recordA = (SourceRecord)tableA.get(i3);
            SourceRecord recordB = (SourceRecord)tableB.get(i3);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id3), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id3), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
            String txId = this.assertBeginTransaction((SourceRecord)txMetadata.get(2 * i3 + 1));
            this.assertRecordTransactionMetadata(recordA, txId, 1L, 1L);
            this.assertRecordTransactionMetadata(recordB, txId, 2L, 1L);
            if (i3 >= 29) continue;
            this.assertEndTransaction((SourceRecord)txMetadata.get(2 * i3 + 2), txId, 2L, Collect.hashMapOf((Object)"testDB.dbo.tablea", (Object)1, (Object)"testDB.dbo.tableb", (Object)1));
        }
    }

    @Test
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        this.restartInTheMiddleOfTx(true, false);
    }

    @Test
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        this.restartInTheMiddleOfTx(false, true);
    }

    @Test
    public void restartInTheMiddleOfTx() throws Exception {
        this.restartInTheMiddleOfTx(false, false);
    }

    private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
        expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
    }
}

