/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.db2;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.db2.Db2Connection;
import io.debezium.connector.db2.Db2Connector;
import io.debezium.connector.db2.Db2ConnectorConfig;
import io.debezium.connector.db2.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class Db2ConnectorIT
extends AbstractConnectorTest {
    private Db2Connection connection;

    @Before
    public void before() throws SQLException {
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMSNAP_REGISTER"});
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int not null, cola varchar(30), primary key (id))", "CREATE TABLE tableb (id int not null, colb varchar(30), primary key (id))", "CREATE TABLE masked_hashed_column_table (id int not null, name varchar(255), name2 varchar(255), name3 varchar(20), primary key (id))", "CREATE TABLE truncated_column_table (id int not null, name varchar(20), primary key (id))", "CREATE TABLE dt_table (id int not null, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4), primary key(id))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "TABLEA");
        TestHelper.enableTableCdc(this.connection, "TABLEB");
        TestHelper.enableTableCdc(this.connection, "MASKED_HASHED_COLUMN_TABLE");
        TestHelper.enableTableCdc(this.connection, "TRUNCATED_COLUMN_TABLE");
        TestHelper.enableTableCdc(this.connection, "DT_TABLE");
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
        Testing.Print.enable();
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            TestHelper.disableDbCdc(this.connection);
            TestHelper.disableTableCdc(this.connection, "DT_TABLE");
            TestHelper.disableTableCdc(this.connection, "TRUNCATED_COLUMN_TABLE");
            TestHelper.disableTableCdc(this.connection, "MASKED_HASHED_COLUMN_TABLE");
            TestHelper.disableTableCdc(this.connection, "TABLEB");
            TestHelper.disableTableCdc(this.connection, "TABLEA");
            this.connection.execute(new String[]{"DROP TABLE tablea", "DROP TABLE tableb", "DROP TABLE masked_hashed_column_table", "DROP TABLE truncated_column_table", "DROP TABLE dt_table"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMSNAP_REGISTER"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMQREP_COLVERSION"});
            this.connection.execute(new String[]{"DELETE FROM ASNCDC.IBMQREP_TABVERSION"});
            this.connection.close();
        }
    }

    @Test
    public void deleteWithoutTombstone() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).with(Db2ConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        this.consumeRecordsByTopic(10);
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords deleteRecords = this.consumeRecordsByTopic(5);
        List deleteTableA = deleteRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
        List deleteTableB = deleteRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((List)deleteTableA).isNullOrEmpty();
        Assertions.assertThat((List)deleteTableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord deleteRecord = (SourceRecord)deleteTableB.get(i);
            List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct deleteValue = (Struct)deleteRecord.value();
            this.assertRecord((Struct)deleteValue.get("before"), expectedDeleteRow);
            Assert.assertNull((Object)deleteValue.get("after"));
        }
        this.stopConnector();
    }

    @Test
    public void updatePrimaryKey() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.consumeRecordsByTopic(2);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List tableA = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        List tableB = records.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((List)tableA).hasSize(3);
        Assertions.assertThat((List)tableB).hasSize(3);
        List<SchemaAndValueField> expectedDeleteRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedDeleteKeyA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedInsertKeyA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordA = (SourceRecord)tableA.get(0);
        SourceRecord tombstoneRecordA = (SourceRecord)tableA.get(1);
        SourceRecord insertRecordA = (SourceRecord)tableA.get(2);
        Struct deleteKeyA = (Struct)deleteRecordA.key();
        Struct deleteValueA = (Struct)deleteRecordA.value();
        this.assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
        this.assertRecord(deleteKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)deleteValueA.get("after"));
        Struct tombstoneKeyA = (Struct)tombstoneRecordA.key();
        Struct tombstoneValueA = (Struct)tombstoneRecordA.value();
        this.assertRecord(tombstoneKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)tombstoneValueA);
        Struct insertKeyA = (Struct)insertRecordA.key();
        Struct insertValueA = (Struct)insertRecordA.value();
        this.assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
        this.assertRecord(insertKeyA, expectedInsertKeyA);
        Assert.assertNull((Object)insertValueA.get("before"));
        List<SchemaAndValueField> expectedDeleteRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedDeleteKeyB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedInsertKeyB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordB = (SourceRecord)tableB.get(0);
        SourceRecord tombstoneRecordB = (SourceRecord)tableB.get(1);
        SourceRecord insertRecordB = (SourceRecord)tableB.get(2);
        Struct deletekeyB = (Struct)deleteRecordB.key();
        Struct deleteValueB = (Struct)deleteRecordB.value();
        this.assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
        this.assertRecord(deletekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)deleteValueB.get("after"));
        Struct tombstonekeyB = (Struct)tombstoneRecordB.key();
        Struct tombstoneValueB = (Struct)tombstoneRecordB.value();
        this.assertRecord(tombstonekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)tombstoneValueB);
        Struct insertkeyB = (Struct)insertRecordB.key();
        Struct insertValueB = (Struct)insertRecordB.value();
        this.assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
        this.assertRecord(insertkeyB, expectedInsertKeyB);
        Assert.assertNull((Object)insertValueB.get("before"));
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1152"})
    public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.consumeRecordsByTopic(2);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords records1 = this.consumeRecordsByTopic(2);
        this.stopConnector();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        List tableA = records1.recordsForTopic("testdb.DB2INST1.TABLEA");
        tableA.addAll(records2.recordsForTopic("testdb.DB2INST1.TABLEA"));
        List tableB = records2.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((List)tableA).hasSize(3);
        Assertions.assertThat((List)tableB).hasSize(3);
        List<SchemaAndValueField> expectedDeleteRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedDeleteKeyA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedInsertKeyA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordA = (SourceRecord)tableA.get(0);
        SourceRecord tombstoneRecordA = (SourceRecord)tableA.get(1);
        SourceRecord insertRecordA = (SourceRecord)tableA.get(2);
        Struct deleteKeyA = (Struct)deleteRecordA.key();
        Struct deleteValueA = (Struct)deleteRecordA.value();
        this.assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
        this.assertRecord(deleteKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)deleteValueA.get("after"));
        Struct tombstoneKeyA = (Struct)tombstoneRecordA.key();
        Struct tombstoneValueA = (Struct)tombstoneRecordA.value();
        this.assertRecord(tombstoneKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)tombstoneValueA);
        Struct insertKeyA = (Struct)insertRecordA.key();
        Struct insertValueA = (Struct)insertRecordA.value();
        this.assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
        this.assertRecord(insertKeyA, expectedInsertKeyA);
        Assert.assertNull((Object)insertValueA.get("before"));
        List<SchemaAndValueField> expectedDeleteRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedDeleteKeyB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedInsertKeyB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordB = (SourceRecord)tableB.get(0);
        SourceRecord tombstoneRecordB = (SourceRecord)tableB.get(1);
        SourceRecord insertRecordB = (SourceRecord)tableB.get(2);
        Struct deletekeyB = (Struct)deleteRecordB.key();
        Struct deleteValueB = (Struct)deleteRecordB.value();
        this.assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
        this.assertRecord(deletekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)deleteValueB.get("after"));
        Struct tombstonekeyB = (Struct)tombstoneRecordB.key();
        Struct tombstoneValueB = (Struct)tombstoneRecordB.value();
        this.assertRecord(tombstonekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)tombstoneValueB);
        Struct insertkeyB = (Struct)insertRecordB.key();
        Struct insertValueB = (Struct)insertRecordB.value();
        this.assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
        this.assertRecord(insertkeyB, expectedInsertKeyB);
        Assert.assertNull((Object)insertValueB.get("before"));
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1069"})
    public void verifyOffsets() throws Exception {
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 100;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).build();
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        i = 0;
        while (!this.connection.getMaxLsn().isAvailable()) {
            if (i == 30) {
                Assert.fail((String)"Initial changes not written to CDC structures");
            }
            Testing.debug((Object)"Waiting for initial changes to be propagated to CDC structures");
            Thread.sleep(1000L);
            ++i;
        }
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        List records = this.consumeRecordsByTopic(11).allRecordsInOrder();
        records = records.subList(1, records.size());
        Iterator it = records.iterator();
        while (it.hasNext()) {
            SourceRecord record = (SourceRecord)it.next();
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase", new Object[0])).isEqualTo((Object)true);
            if (it.hasNext()) {
                ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress", new Object[0])).isEqualTo((Object)false);
                continue;
            }
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot completed", new Object[0])).isEqualTo((Object)true);
        }
        this.stopConnector();
        for (int i2 = 0; i2 < 5; ++i2) {
            int id = 100 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10);
        List tableA = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
        List tableB = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i3 = 0; i3 < 5; ++i3) {
            int id = i3 + 100;
            SourceRecord recordA = (SourceRecord)tableA.get(i3);
            SourceRecord recordB = (SourceRecord)tableB.get(i3);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("snapshot")).as("Streaming phase", new Object[0])).isNull();
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("snapshot_completed")).as("Streaming phase", new Object[0])).isNull();
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("change_lsn")).as("LSN present", new Object[0])).isNotNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("snapshot")).as("Streaming phase", new Object[0])).isNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("snapshot_completed")).as("Streaming phase", new Object[0])).isNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("change_lsn")).as("LSN present", new Object[0])).isNotNull();
        }
    }

    @Test
    public void testTableIncludeList() throws Exception {
        int RECORDS_PER_TABLE = 5;
        boolean TABLES = true;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.tableb")).build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(5);
        List tableA = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        List tableB = records.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((tableA == null || tableA.isEmpty() ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((List)tableB).hasSize(5);
        this.stopConnector();
    }

    @Test
    public void testTableExcludeList() throws Exception {
        int RECORDS_PER_TABLE = 5;
        boolean TABLES = true;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).with(Db2ConnectorConfig.TABLE_EXCLUDE_LIST, "db2inst1.tablea")).build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(5);
        List tableA = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        List tableB = records.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((tableA == null || tableA.isEmpty() ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((List)tableB).hasSize(5);
        this.stopConnector();
    }

    private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception {
        Struct valueB;
        Struct valueA;
        List<SchemaAndValueField> expectedRowB;
        List<SchemaAndValueField> expectedRowA;
        SourceRecord recordB;
        SourceRecord recordA;
        int id;
        int i;
        int RECORDS_PER_TABLE = 30;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 1000;
        int HALF_ID = 25;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).build();
        if (restartJustAfterSnapshot) {
            this.start(Db2Connector.class, config);
            this.assertConnectorIsRunning();
            this.consumeRecordsByTopic(1);
            this.stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
            TestHelper.refreshAndWait(this.connection);
        }
        this.start(Db2Connector.class, config, record -> {
            if (!"testdb.DB2INST1.TABLEA.Envelope".equals(record.valueSchema().name())) {
                return false;
            }
            Struct envelope = (Struct)record.value();
            Struct after = envelope.getStruct("after");
            Integer id = after.getInt32("ID");
            String value = after.getString("COLA");
            return id != null && id == 25 && "a".equals(value);
        });
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        if (afterStreaming) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            TestHelper.refreshAndWait(this.connection);
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List<SchemaAndValueField> expectedRow = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)-2), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"-a"));
            this.assertRecord(((Struct)((SourceRecord)records.allRecordsInOrder().get(0)).value()).getStruct("after"), expectedRow);
        }
        this.connection.setAutoCommit(false);
        for (int i2 = 0; i2 < 30; ++i2) {
            int id2 = 10 + i2;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        this.connection.connection().commit();
        TestHelper.refreshAndWait(this.connection);
        List records = this.consumeRecordsByTopic(30).allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(30);
        SourceRecord lastRecordForOffset = (SourceRecord)records.get(29);
        Struct value = (Struct)lastRecordForOffset.value();
        List<SchemaAndValueField> expectedLastRow = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)24), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        this.assertRecord((Struct)value.get("after"), expectedLastRow);
        this.stopConnector();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(30);
        records = sourceRecords.allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(30);
        List tableA = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
        List tableB = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
        for (i = 0; i < 15; ++i) {
            id = 25 + i;
            recordA = (SourceRecord)tableA.get(i);
            recordB = (SourceRecord)tableB.get(i);
            expectedRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            expectedRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        for (i = 0; i < 30; ++i) {
            id = 1000 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
            this.connection.connection().commit();
        }
        TestHelper.refreshAndWait(this.connection);
        sourceRecords = this.consumeRecordsByTopic(60);
        tableA = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEA");
        tableB = sourceRecords.recordsForTopic("testdb.DB2INST1.TABLEB");
        Assertions.assertThat((List)tableA).hasSize(30);
        Assertions.assertThat((List)tableB).hasSize(30);
        for (i = 0; i < 30; ++i) {
            id = i + 1000;
            recordA = (SourceRecord)tableA.get(i);
            recordB = (SourceRecord)tableB.get(i);
            expectedRowA = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLA", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            expectedRowB = Arrays.asList(new SchemaAndValueField("ID", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("COLB", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
    }

    @Test
    @FixFor(value={"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        this.restartInTheMiddleOfTx(true, false);
    }

    @Test
    @FixFor(value={"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        this.restartInTheMiddleOfTx(false, true);
    }

    @Test
    public void restartInTheMiddleOfTx() throws Exception {
        this.restartInTheMiddleOfTx(false, false);
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalDatabaseSchema.class);
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.INITIAL)).with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "my_products")).build();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue());
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("column.mask.with.12.chars", "DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME2,DB2INST1.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", "DB2INST1.TRUNCATED_COLUMN_TABLE.NAME").build();
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"});
        this.connection.execute(new String[]{"INSERT INTO truncated_column_table VALUES(11, 'some_name')"});
        TestHelper.refreshAndWait(this.connection);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        List tableA = records.recordsForTopic("testdb.DB2INST1.MASKED_HASHED_COLUMN_TABLE");
        List tableB = records.recordsForTopic("testdb.DB2INST1.TRUNCATED_COLUMN_TABLE");
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecord record = (SourceRecord)tableA.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)10);
        Struct value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Struct after = value.getStruct("after");
            Assertions.assertThat((String)after.getString("NAME")).isEqualTo((Object)"************");
            Assertions.assertThat((String)after.getString("NAME2")).isEqualTo((Object)"8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
            Assertions.assertThat((String)after.getString("NAME3")).isEqualTo((Object)"8e68c68edbbac316dfe2");
        }
        Assertions.assertThat((List)tableB).hasSize(1);
        record = (SourceRecord)tableB.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)11);
        value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("NAME")).isEqualTo((Object)"some");
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldRewriteIdentityKey() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(Db2ConnectorConfig.MSG_KEY_COLUMNS, "(.*).tablea:id,cola")).build();
        this.start(Db2Connector.class, config);
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO tablea (id, cola) values (100, 'hundred')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat((Object)((SourceRecord)recordsForTopic.get(0)).key()).isNotNull();
        Struct key = (Struct)((SourceRecord)recordsForTopic.get(0)).key();
        Assertions.assertThat((Object)key.get("ID")).isNotNull();
        Assertions.assertThat((Object)key.get("COLA")).isNotNull();
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)Db2ConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.DECIMAL,.+\\.REAL").build();
        this.start(Db2Connector.class, config);
        this.consumeRecordsByTopic(1);
        this.consumeRecordsByTopic(1);
        TestHelper.enableDbCdc(this.connection);
        this.connection.execute(new String[]{"UPDATE ASNCDC.IBMSNAP_REGISTER SET STATE = 'A' WHERE SOURCE_OWNER = 'DB2INST1'"});
        TestHelper.refreshAndWait(this.connection);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("testdb.DB2INST1.DT_TABLE");
        Assertions.assertThat((List)recordsForTopic).hasSize(1);
        Field before = ((SourceRecord)recordsForTopic.get(0)).valueSchema().field("before");
        Assertions.assertThat((Map)before.schema().field("ID").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("C1").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("C2").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("C3A").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"DECIMAL"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"5"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"2")});
        Assertions.assertThat((Map)before.schema().field("C3B").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"VARCHAR"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"128")});
        Assertions.assertThat((Map)before.schema().field("F2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"DECIMAL"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"8"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"4")});
        Assertions.assertThat((Map)before.schema().field("F1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"REAL"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"24")});
    }

    @Test
    @FixFor(value={"DBZ-3668"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(Db2ConnectorConfig.TABLE_INCLUDE_LIST, "db2inst1.tablea")).build();
        this.connection.execute(new String[]{"INSERT INTO tablea (id,cola) values (1001, 'DBZ3668')"});
        this.start(Db2Connector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List tablea = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat((List)tablea).hasSize(1);
        for (SourceRecord record : tablea) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"db2", (String)"testdb", (boolean)false);
        }
        this.connection.execute(new String[]{"INSERT INTO tablea (id,cola) values (1002, 'DBZ3668')"});
        records = this.consumeRecordsByTopic(1);
        tablea = records.recordsForTopic("testdb.DB2INST1.TABLEA");
        Assertions.assertThat((List)tablea).hasSize(1);
        for (SourceRecord record : tablea) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false, jsonNode -> Assertions.assertThat((String)jsonNode.get("id").asText()).contains(new CharSequence[]{"commit_lsn:"}));
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"db2", (String)"testdb", (boolean)false);
        }
    }

    private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
        expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
    }
}

