/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.informix;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.informix.InformixConnection;
import io.debezium.connector.informix.InformixConnector;
import io.debezium.connector.informix.InformixConnectorConfig;
import io.debezium.connector.informix.util.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TransactionMetadataIT
extends AbstractAsyncEngineConnectorTest {
    private InformixConnection connection;

    @Before
    public void before() throws SQLException {
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"DROP TABLE IF EXISTS tablea", "DROP TABLE IF EXISTS tableb", "CREATE TABLE tablea (id int not null, cola varchar(30), primary key (id))", "CREATE TABLE tableb (id int not null, colb varchar(30), primary key (id))", "INSERT INTO tablea VALUES(1, 'a')"});
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
        Testing.Print.enable();
    }

    @After
    public void after() throws SQLException {
        this.stopConnector();
        TransactionMetadataIT.waitForConnectorShutdown((String)"informix_server", (String)"testdb");
        this.assertConnectorNotRunning();
        if (this.connection != null) {
            this.connection.rollback().execute(new String[]{"DROP TABLE tablea", "DROP TABLE tableb"}).close();
        }
    }

    @Test
    public void transactionMetadata() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(InformixConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)InformixConnectorConfig.SnapshotMode.INITIAL)).with(InformixConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)).build();
        this.start(InformixConnector.class, config);
        this.assertConnectorIsRunning();
        TransactionMetadataIT.waitForSnapshotToBeCompleted((String)"informix_server", (String)"testdb");
        this.consumeRecordsByTopic(1);
        TransactionMetadataIT.waitForStreamingRunning((String)"informix_server", (String)"testdb");
        this.connection.setAutoCommit(false);
        for (int i = 10; i < 15; ++i) {
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + i + ", 'a')", "INSERT INTO tableb VALUES(" + i + ", 'b')"});
        }
        this.connection.commit();
        this.connection.setAutoCommit(true);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1000, 'b')"});
        this.waitForAvailableRecords();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(15);
        List tableA = records.recordsForTopic("testdb.informix.tablea");
        List tableB = records.recordsForTopic("testdb.informix.tableb");
        List tx = records.recordsForTopic("testdb.transaction");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(6);
        Assertions.assertThat((List)tx).hasSize(4);
        List all = records.allRecordsInOrder();
        String txId = this.assertBeginTransaction((SourceRecord)all.get(0));
        for (int i = 1; i <= 10; ++i) {
            this.assertRecordTransactionMetadata((SourceRecord)all.get(i), txId, i, (i + 1) / 2);
        }
        this.assertEndTransaction((SourceRecord)all.get(11), txId, 10L, Collect.hashMapOf((Object)"testdb.informix.tablea", (Object)5, (Object)"testdb.informix.tableb", (Object)5));
    }

    protected String assertBeginTransaction(SourceRecord record) {
        Struct begin = (Struct)record.value();
        Struct beginKey = (Struct)record.key();
        Map offset = record.sourceOffset();
        Assertions.assertThat((String)begin.getString("status")).isEqualTo((Object)"BEGIN");
        Assertions.assertThat((Long)begin.getInt64("event_count")).isNull();
        String txId = begin.getString("id");
        Assertions.assertThat((String)beginKey.getString("id")).isEqualTo((Object)txId);
        String expectedId = Arrays.stream(txId.split(":")).findFirst().get();
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)expectedId);
        return txId;
    }

    protected void assertEndTransaction(SourceRecord record, String beginTxId, long expectedEventCount, Map<String, Number> expectedPerTableCount) {
        Struct end = (Struct)record.value();
        Struct endKey = (Struct)record.key();
        Map offset = record.sourceOffset();
        String expectedId = Arrays.stream(beginTxId.split(":")).findFirst().get();
        String expectedTxId = String.format("%s:%s", expectedId, offset.get("commit_lsn"));
        Assertions.assertThat((String)end.getString("status")).isEqualTo((Object)"END");
        Assertions.assertThat((String)end.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat((Long)end.getInt64("event_count")).isEqualTo(expectedEventCount);
        Assertions.assertThat((String)endKey.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat(end.getArray("data_collections").stream().map(x -> (Struct)x).collect(Collectors.toMap(x -> x.getString("data_collection"), x -> x.getInt64("event_count")))).isEqualTo(expectedPerTableCount.entrySet().stream().collect(Collectors.toMap(x -> (String)x.getKey(), x -> ((Number)x.getValue()).longValue())));
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)expectedId);
    }

    protected void assertRecordTransactionMetadata(SourceRecord record, String beginTxId, long expectedTotalOrder, long expectedCollectionOrder) {
        Struct change = ((Struct)record.value()).getStruct("transaction");
        Map offset = record.sourceOffset();
        String expectedId = Arrays.stream(beginTxId.split(":")).findFirst().get();
        String expectedTxId = String.format("%s:%s", expectedId, offset.get("commit_lsn"));
        Assertions.assertThat((String)change.getString("id")).isEqualTo((Object)expectedTxId);
        Assertions.assertThat((Long)change.getInt64("total_order")).isEqualTo(expectedTotalOrder);
        Assertions.assertThat((Long)change.getInt64("data_collection_order")).isEqualTo(expectedCollectionOrder);
        Assertions.assertThat(offset.get("transaction_id")).isEqualTo((Object)expectedId);
    }
}

