/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.sqlserver.Lsn;
import io.debezium.connector.sqlserver.SqlServerChangeTable;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.SourceRecordAssert;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
import io.debezium.relational.history.DatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.FileDatabaseHistory;
import io.debezium.relational.history.HistoryRecordComparator;
import io.debezium.relational.history.TableChanges;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.fest.assertions.ObjectAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SqlServerConnectorIT
extends AbstractConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "INSERT INTO tablea VALUES(1, 'a')"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void createAndDelete() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct keyA = (Struct)recordA.key();
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct keyB = (Struct)recordB.key();
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        AbstractConnectorTest.SourceRecords deleteRecords = this.consumeRecordsByTopic(10);
        List deleteTableA = deleteRecords.recordsForTopic("server1.dbo.tablea");
        List deleteTableB = deleteRecords.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)deleteTableA).isNullOrEmpty();
        Assertions.assertThat((List)deleteTableB).hasSize(10);
        for (int i = 0; i < 5; ++i) {
            SourceRecord deleteRecord = (SourceRecord)deleteTableB.get(i * 2);
            SourceRecord tombstoneRecord = (SourceRecord)deleteTableB.get(i * 2 + 1);
            List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct deleteKey = (Struct)deleteRecord.key();
            Struct deleteValue = (Struct)deleteRecord.value();
            this.assertRecord((Struct)deleteValue.get("before"), expectedDeleteRow);
            Assert.assertNull((Object)deleteValue.get("after"));
            Struct tombstoneKey = (Struct)tombstoneRecord.key();
            Struct tombstoneValue = (Struct)tombstoneRecord.value();
            Assert.assertNull((Object)tombstoneValue);
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1642"})
    public void readOnlyApplicationIntent() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        String appId = "readOnlyApplicationIntent-" + UUID.randomUUID();
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with("database.applicationIntent", "ReadOnly").with("database.applicationName", appId).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        this.consumeRecordsByTopic(1);
        TestHelper.waitForStreamingStarted();
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10, 24);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct keyA = (Struct)recordA.key();
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct keyB = (Struct)recordB.key();
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        Assertions.assertThat((boolean)logInterceptor.containsMessage("Schema locking was disabled in connector configuration")).isTrue();
        try (SqlServerConnection admin = TestHelper.adminConnection();){
            HashSet txIds = new HashSet();
            Awaitility.await().atMost((long)(TestHelper.waitTimeForRecords() * 5), TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
                admin.query("SELECT (SELECT transaction_id FROM sys.dm_tran_session_transactions AS t WHERE s.session_id=t.session_id) FROM sys.dm_exec_sessions AS s WHERE program_name='" + appId + "'", rs -> {
                    rs.next();
                    txIds.add(rs.getLong(1));
                });
                return txIds.size() > 2;
            });
        }
        this.stopConnector();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-1643"})
    public void timestampAndTimezone() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        TimeZone currentTimeZone = TimeZone.getDefault();
        try {
            TimeZone.setDefault(TimeZone.getTimeZone("Australia/Canberra"));
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
            this.start(SqlServerConnector.class, config);
            this.assertConnectorIsRunning();
            this.consumeRecordsByTopic(1);
            Instant now = Instant.now();
            Instant lowerBound = now.minusSeconds(300L);
            Instant upperBound = now.plusSeconds(300L);
            for (int i = 0; i < 5; ++i) {
                int id = 10 + i;
                this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
                this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
            }
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
            List tableA = records.recordsForTopic("server1.dbo.tablea");
            List tableB = records.recordsForTopic("server1.dbo.tableb");
            Assertions.assertThat((List)tableA).hasSize(5);
            Assertions.assertThat((List)tableB).hasSize(5);
            for (int i = 0; i < 5; ++i) {
                SourceRecord recordA = (SourceRecord)tableA.get(i);
                long timestamp = ((Struct)recordA.value()).getStruct("source").getInt64("ts_ms");
                Instant instant = Instant.ofEpochMilli(timestamp);
                Assertions.assertThat((instant.isAfter(lowerBound) && instant.isBefore(upperBound) ? 1 : 0) != 0).isTrue();
            }
            this.stopConnector();
        }
        finally {
            TimeZone.setDefault(currentTimeZone);
        }
    }

    @Test
    public void deleteWithoutTombstone() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        this.connection.execute(new String[]{"DELETE FROM tableB"});
        AbstractConnectorTest.SourceRecords deleteRecords = this.consumeRecordsByTopic(5);
        List deleteTableA = deleteRecords.recordsForTopic("server1.dbo.tablea");
        List deleteTableB = deleteRecords.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)deleteTableA).isNullOrEmpty();
        Assertions.assertThat((List)deleteTableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord deleteRecord = (SourceRecord)deleteTableB.get(i);
            List<SchemaAndValueField> expectedDeleteRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct deleteKey = (Struct)deleteRecord.key();
            Struct deleteValue = (Struct)deleteRecord.value();
            this.assertRecord((Struct)deleteValue.get("before"), expectedDeleteRow);
            Assert.assertNull((Object)deleteValue.get("after"));
        }
        this.stopConnector();
    }

    @Test
    public void update() throws Exception {
        SourceRecord recordB;
        int i;
        int RECORDS_PER_TABLE = 5;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        String[] tableBInserts = new String[5];
        for (int i2 = 0; i2 < 5; ++i2) {
            int id = 10 + i2;
            tableBInserts[i2] = "INSERT INTO tableb VALUES(" + id + ", 'b')";
        }
        this.connection.execute(tableBInserts);
        this.connection.setAutoCommit(true);
        this.connection.execute(new String[]{"UPDATE tableb SET colb='z'"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableB).hasSize(10);
        for (i = 0; i < 5; ++i) {
            recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct keyB = (Struct)recordB.key();
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        for (i = 0; i < 5; ++i) {
            recordB = (SourceRecord)tableB.get(i + 5);
            List<SchemaAndValueField> expectedBefore = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            List<SchemaAndValueField> expectedAfter = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"z"));
            Struct keyB = (Struct)recordB.key();
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("before"), expectedBefore);
            this.assertRecord((Struct)valueB.get("after"), expectedAfter);
        }
        this.stopConnector();
    }

    @Test
    public void updatePrimaryKey() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(3);
        Assertions.assertThat((List)tableB).hasSize(3);
        List<SchemaAndValueField> expectedDeleteRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedDeleteKeyA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedInsertKeyA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordA = (SourceRecord)tableA.get(0);
        SourceRecord tombstoneRecordA = (SourceRecord)tableA.get(1);
        SourceRecord insertRecordA = (SourceRecord)tableA.get(2);
        Struct deleteKeyA = (Struct)deleteRecordA.key();
        Struct deleteValueA = (Struct)deleteRecordA.value();
        this.assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
        this.assertRecord(deleteKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)deleteValueA.get("after"));
        Struct tombstoneKeyA = (Struct)tombstoneRecordA.key();
        Struct tombstoneValueA = (Struct)tombstoneRecordA.value();
        this.assertRecord(tombstoneKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)tombstoneValueA);
        Struct insertKeyA = (Struct)insertRecordA.key();
        Struct insertValueA = (Struct)insertRecordA.value();
        this.assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
        this.assertRecord(insertKeyA, expectedInsertKeyA);
        Assert.assertNull((Object)insertValueA.get("before"));
        List<SchemaAndValueField> expectedDeleteRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedDeleteKeyB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedInsertKeyB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordB = (SourceRecord)tableB.get(0);
        SourceRecord tombstoneRecordB = (SourceRecord)tableB.get(1);
        SourceRecord insertRecordB = (SourceRecord)tableB.get(2);
        Struct deletekeyB = (Struct)deleteRecordB.key();
        Struct deleteValueB = (Struct)deleteRecordB.value();
        this.assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
        this.assertRecord(deletekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)deleteValueB.get("after"));
        Assertions.assertThat((Long)deleteValueB.getStruct("source").getInt64("event_serial_no")).isEqualTo(1L);
        Struct tombstonekeyB = (Struct)tombstoneRecordB.key();
        Struct tombstoneValueB = (Struct)tombstoneRecordB.value();
        this.assertRecord(tombstonekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)tombstoneValueB);
        Struct insertkeyB = (Struct)insertRecordB.key();
        Struct insertValueB = (Struct)insertRecordB.value();
        this.assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
        this.assertRecord(insertkeyB, expectedInsertKeyB);
        Assert.assertNull((Object)insertValueB.get("before"));
        Assertions.assertThat((Long)insertValueB.getStruct("source").getInt64("event_serial_no")).isEqualTo(2L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1152"})
    public void updatePrimaryKeyWithRestartInMiddle() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config, record -> {
            Struct envelope = (Struct)record.value();
            return envelope != null && "c".equals(envelope.get("op")) && envelope.getStruct("after").getInt32("id") == 100;
        });
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"UPDATE tablea SET id=100 WHERE id=1", "UPDATE tableb SET id=100 WHERE id=1"});
        AbstractConnectorTest.SourceRecords records1 = this.consumeRecordsByTopic(2);
        this.stopConnector();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        List tableA = records1.recordsForTopic("server1.dbo.tablea");
        tableA.addAll(records2.recordsForTopic("server1.dbo.tablea"));
        List tableB = records2.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(3);
        Assertions.assertThat((List)tableB).hasSize(3);
        List<SchemaAndValueField> expectedDeleteRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedDeleteKeyA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
        List<SchemaAndValueField> expectedInsertKeyA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordA = (SourceRecord)tableA.get(0);
        SourceRecord tombstoneRecordA = (SourceRecord)tableA.get(1);
        SourceRecord insertRecordA = (SourceRecord)tableA.get(2);
        Struct deleteKeyA = (Struct)deleteRecordA.key();
        Struct deleteValueA = (Struct)deleteRecordA.value();
        this.assertRecord(deleteValueA.getStruct("before"), expectedDeleteRowA);
        this.assertRecord(deleteKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)deleteValueA.get("after"));
        Struct tombstoneKeyA = (Struct)tombstoneRecordA.key();
        Struct tombstoneValueA = (Struct)tombstoneRecordA.value();
        this.assertRecord(tombstoneKeyA, expectedDeleteKeyA);
        Assert.assertNull((Object)tombstoneValueA);
        Struct insertKeyA = (Struct)insertRecordA.key();
        Struct insertValueA = (Struct)insertRecordA.value();
        this.assertRecord(insertValueA.getStruct("after"), expectedInsertRowA);
        this.assertRecord(insertKeyA, expectedInsertKeyA);
        Assert.assertNull((Object)insertValueA.get("before"));
        List<SchemaAndValueField> expectedDeleteRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedDeleteKeyB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)1));
        List<SchemaAndValueField> expectedInsertRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        List<SchemaAndValueField> expectedInsertKeyB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)100));
        SourceRecord deleteRecordB = (SourceRecord)tableB.get(0);
        SourceRecord tombstoneRecordB = (SourceRecord)tableB.get(1);
        SourceRecord insertRecordB = (SourceRecord)tableB.get(2);
        Struct deletekeyB = (Struct)deleteRecordB.key();
        Struct deleteValueB = (Struct)deleteRecordB.value();
        this.assertRecord(deleteValueB.getStruct("before"), expectedDeleteRowB);
        this.assertRecord(deletekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)deleteValueB.get("after"));
        Struct tombstonekeyB = (Struct)tombstoneRecordB.key();
        Struct tombstoneValueB = (Struct)tombstoneRecordB.value();
        this.assertRecord(tombstonekeyB, expectedDeleteKeyB);
        Assert.assertNull((Object)tombstoneValueB);
        Struct insertkeyB = (Struct)insertRecordB.key();
        Struct insertValueB = (Struct)insertRecordB.value();
        this.assertRecord(insertValueB.getStruct("after"), expectedInsertRowB);
        this.assertRecord(insertkeyB, expectedInsertKeyB);
        Assert.assertNull((Object)insertValueB.get("before"));
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-2329"})
    public void updatePrimaryKeyTwiceWithRestartInMiddleOfTx() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.MAX_QUEUE_SIZE, 2)).with(SqlServerConnectorConfig.MAX_BATCH_SIZE, 1)).with(SqlServerConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(SqlServerConnector.class, config, record -> {
            Struct envelope = (Struct)record.value();
            boolean stop = envelope != null && "d".equals(envelope.get("op")) && envelope.getStruct("before").getInt32("id") == 305;
            return stop;
        });
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.setAutoCommit(false);
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (1,'1')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (2,'2')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (3,'3')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (4,'4')"});
        this.connection.execute(new String[]{"INSERT INTO tableb (id, colb) values (5,'5')"});
        this.consumeRecordsByTopic(5);
        this.connection.execute(new String[]{"UPDATE tableb set id = colb + 300"});
        this.connection.execute(new String[]{"UPDATE tableb set id = colb + 300"});
        AbstractConnectorTest.SourceRecords records1 = this.consumeRecordsByTopic(14);
        this.stopConnector();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(6);
        List tableB = records1.recordsForTopic("server1.dbo.tableb");
        tableB.addAll(records2.recordsForTopic("server1.dbo.tableb"));
        Assertions.assertThat((List)tableB).hasSize(20);
        this.stopConnector();
    }

    @Test
    public void streamChangesWhileStopped() throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 100;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (i = 0; i < 5; ++i) {
            id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        this.consumeRecordsByTopic(10);
        this.stopConnector();
        for (i = 0; i < 5; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i2 = 0; i2 < 5; ++i2) {
            int id2 = i2 + 100;
            SourceRecord recordA = (SourceRecord)tableA.get(i2);
            SourceRecord recordB = (SourceRecord)tableB.get(i2);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id2), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
    }

    @Test
    @FixFor(value={"DBZ-1069"})
    public void verifyOffsets() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 100;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        ArrayList<Integer> expectedIds = new ArrayList<Integer>();
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
            expectedIds.add(id);
        }
        String tableaCT = this.connection.getNameOfChangeTable("tablea");
        String tablebCT = this.connection.getNameOfChangeTable("tableb");
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).until(() -> {
            if (!this.connection.getMaxLsn().isAvailable()) {
                return false;
            }
            HashMap resultMap = new HashMap();
            this.connection.listOfChangeTables().forEach(ct -> {
                String tableName = ct.getChangeTableId().table();
                if (tableName.endsWith("dbo_" + tableaCT) || tableName.endsWith("dbo_" + tablebCT)) {
                    try {
                        Lsn minLsn = this.connection.getMinLsn(tableName);
                        Lsn maxLsn = this.connection.getMaxLsn();
                        SqlServerChangeTable[] tables = Collections.singletonList(ct).toArray(new SqlServerChangeTable[0]);
                        ArrayList ids = new ArrayList();
                        this.connection.getChangesForTables(tables, minLsn, maxLsn, resultsets -> {
                            ResultSet rs = resultsets[0];
                            while (rs.next()) {
                                ids.add(rs.getInt("id"));
                            }
                        });
                        if (ids.equals(expectedIds)) {
                            resultMap.put(tableName, true);
                        } else {
                            resultMap.put(tableName, false);
                        }
                    }
                    catch (Exception e) {
                        Assert.fail((String)("Failed to fetch changes for table " + tableName + ": " + e.getMessage()));
                    }
                }
            });
            return resultMap.values().stream().filter(v -> v == false).count() == 0L;
        });
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        List records = this.consumeRecordsByTopic(11).allRecordsInOrder();
        records = records.subList(1, records.size());
        Iterator it = records.iterator();
        while (it.hasNext()) {
            SourceRecord record = (SourceRecord)it.next();
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase")).isEqualTo((Object)true);
            if (it.hasNext()) {
                ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress")).isEqualTo((Object)false);
                continue;
            }
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot completed")).isEqualTo((Object)true);
        }
        this.stopConnector();
        for (int i = 0; i < 5; ++i) {
            int id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(10);
        List tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
        List tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            int id = i + 100;
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("snapshot")).as("Streaming phase")).isNull();
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("snapshot_completed")).as("Streaming phase")).isNull();
            ((ObjectAssert)Assertions.assertThat(recordA.sourceOffset().get("change_lsn")).as("LSN present")).isNotNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("snapshot")).as("Streaming phase")).isNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("snapshot_completed")).as("Streaming phase")).isNull();
            ((ObjectAssert)Assertions.assertThat(recordB.sourceOffset().get("change_lsn")).as("LSN present")).isNotNull();
        }
    }

    @Test
    public void whitelistTable() throws Exception {
        int RECORDS_PER_TABLE = 5;
        boolean TABLES = true;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.tableb")).build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(5);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((tableA == null || tableA.isEmpty() ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((List)tableB).hasSize(5);
        this.stopConnector();
    }

    @Test
    public void blacklistTable() throws Exception {
        int RECORDS_PER_TABLE = 5;
        boolean TABLES = true;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.TABLE_BLACKLIST, "dbo.tablea")).build();
        this.connection.execute(new String[]{"INSERT INTO tableb VALUES(1, 'b')"});
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(5);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((tableA == null || tableA.isEmpty() ? 1 : 0) != 0).isTrue();
        Assertions.assertThat((List)tableB).hasSize(5);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1617"})
    public void blacklistColumnWhenCdcColumnsDoNotMatchWithOriginalSnapshot() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE table_a (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "table_a");
        this.connection.execute(new String[]{"ALTER TABLE table_a ADD blacklisted_column varchar(30)"});
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(SqlServerConnectorConfig.COLUMN_BLACKLIST, "dbo.table_a.blacklisted_column")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO table_a VALUES(10, 'some_name', 120, 'some_string')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List tableA = records.recordsForTopic("server1.dbo.table_a");
        Schema expectedSchemaA = SchemaBuilder.struct().optional().name("server1.dbo.table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct expectedValueA = new Struct(expectedSchemaA).put("id", (Object)10).put("name", (Object)"some_name").put("amount", (Object)120);
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord)((SourceRecord)tableA.get(0))).valueAfterFieldIsEqualTo(expectedValueA).valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1067"})
    public void blacklistColumn() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE blacklist_column_table_a (id int, name varchar(30), amount integer primary key(id))", "CREATE TABLE blacklist_column_table_b (id int, name varchar(30), amount integer primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_a");
        TestHelper.enableTableCdc(this.connection, "blacklist_column_table_b");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(SqlServerConnectorConfig.COLUMN_BLACKLIST, "dbo.blacklist_column_table_a.amount")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_a VALUES(10, 'some_name', 120)"});
        this.connection.execute(new String[]{"INSERT INTO blacklist_column_table_b VALUES(11, 'some_name', 447)"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        List tableA = records.recordsForTopic("server1.dbo.blacklist_column_table_a");
        List tableB = records.recordsForTopic("server1.dbo.blacklist_column_table_b");
        Schema expectedSchemaA = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_a.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct expectedValueA = new Struct(expectedSchemaA).put("id", (Object)10).put("name", (Object)"some_name");
        Schema expectedSchemaB = SchemaBuilder.struct().optional().name("server1.dbo.blacklist_column_table_b.Value").field("id", Schema.INT32_SCHEMA).field("name", Schema.OPTIONAL_STRING_SCHEMA).field("amount", Schema.OPTIONAL_INT32_SCHEMA).build();
        Struct expectedValueB = new Struct(expectedSchemaB).put("id", (Object)11).put("name", (Object)"some_name").put("amount", (Object)447);
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord)((SourceRecord)tableA.get(0))).valueAfterFieldIsEqualTo(expectedValueA).valueAfterFieldSchemaIsEqualTo(expectedSchemaA);
        Assertions.assertThat((List)tableB).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord)((SourceRecord)tableB.get(0))).valueAfterFieldIsEqualTo(expectedValueB).valueAfterFieldSchemaIsEqualTo(expectedSchemaB);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1692"})
    public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE masked_hashed_column_table_a (id int, name varchar(255) primary key(id))", "CREATE TABLE masked_hashed_column_table_b (id int, name varchar(20), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table_a");
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table_b");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "testDB.dbo.masked_hashed_column_table_a.name, testDB.dbo.masked_hashed_column_table_b.name").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table_a VALUES(10, 'some_name')"});
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table_b VALUES(11, 'some_name')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        List tableA = records.recordsForTopic("server1.dbo.masked_hashed_column_table_a");
        List tableB = records.recordsForTopic("server1.dbo.masked_hashed_column_table_b");
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecord record = (SourceRecord)tableA.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"id", (int)10);
        Struct value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("name")).isEqualTo((Object)"3b225d0696535d66f2c0fb2e36b012c520d396af3dd8f18330b9c9cd23ca714e");
        }
        Assertions.assertThat((List)tableB).hasSize(1);
        record = (SourceRecord)tableB.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"id", (int)11);
        value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("name")).isEqualTo((Object)"3b225d0696535d66f2c0");
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1972"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumns() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE masked_hashed_column_table (id int, name varchar(255) primary key(id))", "CREATE TABLE truncated_column_table (id int, name varchar(20), primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "masked_hashed_column_table");
        TestHelper.enableTableCdc(this.connection, "truncated_column_table");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("column.mask.with.12.chars", "testDB.dbo.masked_hashed_column_table.name").with("column.truncate.to.4.chars", "testDB.dbo.truncated_column_table.name").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.connection.execute(new String[]{"INSERT INTO masked_hashed_column_table VALUES(10, 'some_name')"});
        this.connection.execute(new String[]{"INSERT INTO truncated_column_table VALUES(11, 'some_name')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        List tableA = records.recordsForTopic("server1.dbo.masked_hashed_column_table");
        List tableB = records.recordsForTopic("server1.dbo.truncated_column_table");
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecord record = (SourceRecord)tableA.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"id", (int)10);
        Struct value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("name")).isEqualTo((Object)"************");
        }
        Assertions.assertThat((List)tableB).hasSize(1);
        record = (SourceRecord)tableB.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"id", (int)11);
        value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("name")).isEqualTo((Object)"some");
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-964"})
    public void shouldPropagateDatabaseDriverProperties() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("database.applicationName", "Debezium App DBZ-964").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.connection.execute(new String[]{"INSERT INTO tablea VALUES(964, 'a')"});
        this.consumeRecordsByTopic(1);
        this.connection.query("select count(1) from sys.dm_exec_sessions where program_name = 'Debezium App DBZ-964'", rs -> {
            rs.next();
            Assertions.assertThat((int)rs.getInt(1)).isGreaterThanOrEqualTo(1);
        });
    }

    private void restartInTheMiddleOfTx(boolean restartJustAfterSnapshot, boolean afterStreaming) throws Exception {
        Struct valueB;
        Struct valueA;
        List<SchemaAndValueField> expectedRowB;
        List<SchemaAndValueField> expectedRowA;
        SourceRecord recordB;
        SourceRecord recordA;
        int id;
        int i;
        int RECORDS_PER_TABLE = 30;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 1000;
        int HALF_ID = 25;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        if (restartJustAfterSnapshot) {
            this.start(SqlServerConnector.class, config);
            this.assertConnectorIsRunning();
            this.consumeRecordsByTopic(1);
            this.stopConnector();
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-1, '-a')"});
        }
        this.start(SqlServerConnector.class, config, record -> {
            if (!"server1.dbo.tablea.Envelope".equals(record.valueSchema().name())) {
                return false;
            }
            Struct envelope = (Struct)record.value();
            Struct after = envelope.getStruct("after");
            Integer id = after.getInt32("id");
            String value = after.getString("cola");
            return id != null && id == 25 && "a".equals(value);
        });
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        if (afterStreaming) {
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(-2, '-a')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List<SchemaAndValueField> expectedRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)-2), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"-a"));
            this.assertRecord(((Struct)((SourceRecord)records.allRecordsInOrder().get(0)).value()).getStruct("after"), expectedRow);
        }
        this.connection.setAutoCommit(false);
        for (int i2 = 0; i2 < 30; ++i2) {
            int id2 = 10 + i2;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        this.connection.connection().commit();
        TestHelper.waitForCdcRecord(this.connection, "tablea", rs -> rs.getInt("id") == 39);
        TestHelper.waitForCdcRecord(this.connection, "tableb", rs -> rs.getInt("id") == 39);
        List records = this.consumeRecordsByTopic(30).allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(30);
        SourceRecord lastRecordForOffset = (SourceRecord)records.get(29);
        Struct value = (Struct)lastRecordForOffset.value();
        List<SchemaAndValueField> expectedLastRow = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)24), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
        this.assertRecord((Struct)value.get("after"), expectedLastRow);
        this.stopConnector();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(30);
        records = sourceRecords.allRecordsInOrder();
        Assertions.assertThat((List)records).hasSize(30);
        List tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
        List tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
        for (i = 0; i < 15; ++i) {
            id = 25 + i;
            recordA = (SourceRecord)tableA.get(i);
            recordB = (SourceRecord)tableB.get(i);
            expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        for (i = 0; i < 30; ++i) {
            id = 1000 + i;
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
            this.connection.connection().commit();
        }
        TestHelper.waitForCdcRecord(this.connection, "tablea", rs -> rs.getInt("id") == 1029);
        TestHelper.waitForCdcRecord(this.connection, "tableb", rs -> rs.getInt("id") == 1029);
        sourceRecords = this.consumeRecordsByTopic(60);
        tableA = sourceRecords.recordsForTopic("server1.dbo.tablea");
        tableB = sourceRecords.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(30);
        Assertions.assertThat((List)tableB).hasSize(30);
        for (i = 0; i < 30; ++i) {
            id = i + 1000;
            recordA = (SourceRecord)tableA.get(i);
            recordB = (SourceRecord)tableB.get(i);
            expectedRowA = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("cola", Schema.OPTIONAL_STRING_SCHEMA, (Object)"a"));
            expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)id), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            valueA = (Struct)recordA.value();
            this.assertRecord((Struct)valueA.get("after"), expectedRowA);
            Assert.assertNull((Object)valueA.get("before"));
            valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
    }

    @Test
    @FixFor(value={"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterSnapshot() throws Exception {
        this.restartInTheMiddleOfTx(true, false);
    }

    @Test
    @FixFor(value={"DBZ-1128"})
    public void restartInTheMiddleOfTxAfterCompletedTx() throws Exception {
        this.restartInTheMiddleOfTx(false, true);
    }

    @Test
    @FixFor(value={"DBZ-1128"})
    public void restartInTheMiddleOfTx() throws Exception {
        this.restartInTheMiddleOfTx(false, false);
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "my_products")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isTrue());
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testNoEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying blacklist/whitelist filters there are no tables to monitor, please check your configuration")).isFalse());
    }

    @Test
    @FixFor(value={"DBZ-916"})
    public void keylessTable() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE keyless (id int, name varchar(30))", "INSERT INTO keyless VALUES(1, 'k')"});
        TestHelper.enableTableCdc(this.connection, "keyless");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.keyless")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        List<SchemaAndValueField> key = Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, (Object)1), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, (Object)"k"));
        List<SchemaAndValueField> key2 = Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, (Object)2), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, (Object)"k"));
        List<SchemaAndValueField> key3 = Arrays.asList(new SchemaAndValueField("id", Schema.OPTIONAL_INT32_SCHEMA, (Object)3), new SchemaAndValueField("name", Schema.OPTIONAL_STRING_SCHEMA, (Object)"k"));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).keySchema()).isNull();
        this.connection.execute(new String[]{"INSERT INTO keyless VALUES(2, 'k')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        this.connection.execute(new String[]{"UPDATE keyless SET id=3 WHERE ID=2"});
        records = this.consumeRecordsByTopic(3);
        SourceRecord update1 = (SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0);
        Assertions.assertThat((Object)update1.key()).isNull();
        Assertions.assertThat((Object)update1.keySchema()).isNull();
        this.assertRecord(((Struct)update1.value()).getStruct("before"), key2);
        this.assertRecord(((Struct)update1.value()).getStruct("after"), key3);
        this.connection.execute(new String[]{"DELETE FROM keyless WHERE id=3"});
        records = this.consumeRecordsByTopic(2, false);
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).key()).isNull();
        Assertions.assertThat((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(0)).keySchema()).isNull();
        Assert.assertNull((Object)((SourceRecord)records.recordsForTopic("server1.dbo.keyless").get(1)).value());
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException, SQLException {
        this.connection.execute(new String[]{"CREATE TABLE keyless (id int, name varchar(30))", "INSERT INTO keyless VALUES(1, 'k')"});
        TestHelper.enableTableCdc(this.connection, "keyless");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.keyless")).with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "(.*).keyless:id")).build();
        this.start(SqlServerConnector.class, config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("server1.dbo.keyless");
        Assertions.assertThat((Object)((SourceRecord)recordsForTopic.get(0)).key()).isNotNull();
        Struct key = (Struct)((SourceRecord)recordsForTopic.get(0)).key();
        Assertions.assertThat((Object)key.get("id")).isNotNull();
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1491"})
    public void shouldCaptureTableSchema() throws SQLException, InterruptedException {
        this.connection.execute(new String[]{"CREATE TABLE table_schema_test (key_cola int not null,key_colb varchar(10) not null,cola int not null,colb datetimeoffset not null default ('2019-01-01 12:34:56.1234567+04:00'),colc varchar(20) default ('default_value'),cold float,primary key(key_cola, key_colb))"});
        TestHelper.enableTableCdc(this.connection, "table_schema_test");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        this.connection.execute(new String[]{"INSERT INTO table_schema_test (key_cola, key_colb, cola, colb, colc, cold) VALUES(1, 'a', 100, '2019-01-01 10:20:39.1234567 +02:00', 'some_value', 100.20)"});
        List records = this.consumeRecordsByTopic(1).recordsForTopic("server1.dbo.table_schema_test");
        Assertions.assertThat((List)records).hasSize(1);
        SourceRecordAssert.assertThat((SourceRecord)((SourceRecord)records.get(0))).keySchemaIsEqualTo(SchemaBuilder.struct().name("server1.dbo.table_schema_test.Key").field("key_cola", Schema.INT32_SCHEMA).field("key_colb", Schema.STRING_SCHEMA).build()).valueAfterFieldSchemaIsEqualTo(SchemaBuilder.struct().optional().name("server1.dbo.table_schema_test.Value").field("key_cola", Schema.INT32_SCHEMA).field("key_colb", Schema.STRING_SCHEMA).field("cola", Schema.INT32_SCHEMA).field("colb", SchemaBuilder.string().name("io.debezium.time.ZonedTimestamp").required().defaultValue((Object)"2019-01-01T12:34:56.1234567+04:00").version(Integer.valueOf(1)).build()).field("colc", SchemaBuilder.string().optional().defaultValue((Object)"default_value").build()).field("cold", Schema.OPTIONAL_FLOAT64_SCHEMA).build());
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1923"})
    public void shouldDetectPurgedHistory() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        int ID_RESTART = 100;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.DATABASE_HISTORY, PurgableFileDatabaseHistory.class)).build();
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollInterval(100L, TimeUnit.MILLISECONDS).until(() -> {
            Testing.debug((Object)"Waiting for initial changes to be propagated to CDC structures");
            return this.connection.getMaxLsn().isAvailable();
        });
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        List records = this.consumeRecordsByTopic(11).allRecordsInOrder();
        records = records.subList(1, records.size());
        Iterator it = records.iterator();
        while (it.hasNext()) {
            SourceRecord record = (SourceRecord)it.next();
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot")).as("Snapshot phase")).isEqualTo((Object)true);
            if (it.hasNext()) {
                ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot in progress")).isEqualTo((Object)false);
                continue;
            }
            ((ObjectAssert)Assertions.assertThat(record.sourceOffset().get("snapshot_completed")).as("Snapshot completed")).isEqualTo((Object)true);
        }
        this.stopConnector();
        for (int i = 0; i < 5; ++i) {
            int id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
        LogInterceptor logInterceptor = new LogInterceptor();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorNotRunning();
        Assertions.assertThat((boolean)logInterceptor.containsStacktraceElement("The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot.")).isTrue();
    }

    @Test
    @FixFor(value={"DBZ-1988"})
    public void shouldHonorSourceTimestampMode() throws InterruptedException, SQLException {
        this.connection.execute(new String[]{"CREATE TABLE source_timestamp_mode (id int, name varchar(30) primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "source_timestamp_mode");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.source_timestamp_mode")).with(SqlServerConnectorConfig.SOURCE_TIMESTAMP_MODE, "processing")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        SqlServerConnectorIT.waitForSnapshotToBeCompleted((String)"sql_server", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO source_timestamp_mode VALUES(1, 'abc')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("server1.dbo.source_timestamp_mode");
        SourceRecord record = (SourceRecord)recordsForTopic.get(0);
        long eventTs = (Long)((Struct)record.value()).get("ts_ms");
        long sourceTs = (Long)((Struct)((Struct)record.value()).get("source")).get("ts_ms");
        Assertions.assertThat((long)(eventTs - sourceTs)).isLessThan(100L);
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1312"})
    public void useShortTableNamesForColumnMapper() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with("column.mask.with.4.chars", "dbo.tablea.cola").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            Assertions.assertThat((String)valueA.getStruct("after").getString("cola")).isEqualTo((Object)"****");
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1312"})
    public void useLongTableNamesForColumnMapper() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with("column.mask.with.4.chars", "testDB.dbo.tablea.cola").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct valueA = (Struct)recordA.value();
            Assertions.assertThat((String)valueA.getStruct("after").getString("cola")).isEqualTo((Object)"****");
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1312"})
    public void useLongTableNamesForKeyMapper() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "testDB.dbo.tablea:cola")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct keyA = (Struct)recordA.key();
            Assertions.assertThat((String)keyA.getString("cola")).isEqualTo((Object)"a");
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1312"})
    public void useShortTableNamesForKeyMapper() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL)).with(SqlServerConnectorConfig.MSG_KEY_COLUMNS, "dbo.tablea:cola")).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        List tableA = records.recordsForTopic("server1.dbo.tablea");
        List tableB = records.recordsForTopic("server1.dbo.tableb");
        Assertions.assertThat((List)tableA).hasSize(5);
        Assertions.assertThat((List)tableB).hasSize(5);
        for (int i = 0; i < 5; ++i) {
            SourceRecord recordA = (SourceRecord)tableA.get(i);
            SourceRecord recordB = (SourceRecord)tableB.get(i);
            List<SchemaAndValueField> expectedRowB = Arrays.asList(new SchemaAndValueField("id", Schema.INT32_SCHEMA, (Object)(i + 10)), new SchemaAndValueField("colb", Schema.OPTIONAL_STRING_SCHEMA, (Object)"b"));
            Struct keyA = (Struct)recordA.key();
            Assertions.assertThat((String)keyA.getString("cola")).isEqualTo((Object)"a");
            Struct valueB = (Struct)recordB.value();
            this.assertRecord((Struct)valueB.get("after"), expectedRowB);
            Assert.assertNull((Object)valueB.get("before"));
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        this.connection.execute(new String[]{"CREATE TABLE dt_table (id int, c1 int, c2 int, c3a numeric(5,2), c3b varchar(128), f1 float(10), f2 decimal(8,4) primary key(id))"});
        TestHelper.enableTableCdc(this.connection, "dt_table");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(SqlServerConnectorConfig.TABLE_WHITELIST, "dbo.dt_table")).with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.REAL,.+\\.DECIMAL").build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        SqlServerConnectorIT.waitForSnapshotToBeCompleted((String)"sql_server", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1, 123, 456, 789.01, 'test', 1.228, 234.56)"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("server1.dbo.dt_table");
        SourceRecord record = (SourceRecord)recordsForTopic.get(0);
        Field before = record.valueSchema().field("before");
        Assertions.assertThat((Map)before.schema().field("id").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("c1").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("c2").schema().parameters()).isNull();
        Assertions.assertThat((Map)before.schema().field("c3a").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry((Object)"__debezium.source.column.type", (Object)"NUMERIC"), MapAssert.entry((Object)"__debezium.source.column.length", (Object)"5"), MapAssert.entry((Object)"__debezium.source.column.scale", (Object)"2")});
        Assertions.assertThat((Map)before.schema().field("c3b").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry((Object)"__debezium.source.column.type", (Object)"VARCHAR"), MapAssert.entry((Object)"__debezium.source.column.length", (Object)"128")});
        Assertions.assertThat((Map)before.schema().field("f2").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry((Object)"__debezium.source.column.type", (Object)"DECIMAL"), MapAssert.entry((Object)"__debezium.source.column.length", (Object)"8"), MapAssert.entry((Object)"__debezium.source.column.scale", (Object)"4")});
        Assertions.assertThat((Map)before.schema().field("f1").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry((Object)"__debezium.source.column.type", (Object)"REAL"), MapAssert.entry((Object)"__debezium.source.column.length", (Object)"24")});
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-2379"})
    public void shouldNotStreamWhenUsingSnapshotModeInitialOnly() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.INITIAL_ONLY)).build();
        LogInterceptor logInterceptor = new LogInterceptor();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        this.consumeRecordsByTopic(1);
        this.assertNoRecordsToConsume();
        String message = "Streaming is not enabled in current configuration";
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsMessage("Streaming is not enabled in current configuration")).isTrue());
    }

    private void assertRecord(Struct record, List<SchemaAndValueField> expected) {
        expected.forEach(schemaAndValueField -> schemaAndValueField.assertFor(record));
    }

    public static class PurgableFileDatabaseHistory
    implements DatabaseHistory {
        final DatabaseHistory delegate = new FileDatabaseHistory();

        public boolean exists() {
            try {
                return this.storageExists() && Files.size(TestHelper.DB_HISTORY_PATH) > 0L;
            }
            catch (IOException e) {
                throw new DatabaseHistoryException("File should exist");
            }
        }

        public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
            this.delegate.configure(config, comparator, listener, useCatalogBeforeSchema);
        }

        public void start() {
            this.delegate.start();
        }

        public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String ddl) throws DatabaseHistoryException {
            this.delegate.record(source, position, databaseName, ddl);
        }

        public void record(Map<String, ?> source, Map<String, ?> position, String databaseName, String schemaName, String ddl, TableChanges changes) throws DatabaseHistoryException {
            this.delegate.record(source, position, databaseName, schemaName, ddl, changes);
        }

        public void recover(Map<String, ?> source, Map<String, ?> position, Tables schema, DdlParser ddlParser) {
            this.delegate.recover(source, position, schema, ddlParser);
        }

        public void stop() {
            this.delegate.stop();
        }

        public boolean storageExists() {
            return this.delegate.storageExists();
        }

        public void initializeStorage() {
            this.delegate.initializeStorage();
        }
    }
}

