/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.sqlserver;

import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.connector.sqlserver.SqlServerConnector;
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
import io.debezium.connector.sqlserver.util.TestHelper;
import io.debezium.document.Array;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class SqlServerChangeTableSetIT
extends AbstractConnectorTest {
    private SqlServerConnection connection;

    @Before
    public void before() throws SQLException {
        TestHelper.createTestDatabase();
        this.connection = TestHelper.testConnection();
        this.connection.execute(new String[]{"CREATE TABLE tablea (id int primary key, cola varchar(30))", "CREATE TABLE tableb (id int primary key, colb varchar(30))", "CREATE TABLE tablec (id int primary key, colc varchar(30))"});
        TestHelper.enableTableCdc(this.connection, "tablea");
        TestHelper.enableTableCdc(this.connection, "tableb");
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
    }

    @After
    public void after() throws SQLException {
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    public void addTable() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START = 10;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        TestHelper.enableTableCdc(this.connection, "tablec");
        this.connection.execute(new String[]{"CREATE TABLE tabled (id int primary key, cold varchar(30))"});
        TestHelper.enableTableCdc(this.connection, "tabled");
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablec VALUES(" + id + ", 'c')"});
            this.connection.execute(new String[]{"INSERT INTO tabled VALUES(" + id + ", 'd')"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablec")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tabled")).hasSize(5);
        records.recordsForTopic("server1.dbo.tablec").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tablec.Value").field("id", Schema.INT32_SCHEMA).field("colc", Schema.OPTIONAL_STRING_SCHEMA).build()));
        records.recordsForTopic("server1.dbo.tabled").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tabled.Value").field("id", Schema.INT32_SCHEMA).field("cold", Schema.OPTIONAL_STRING_SCHEMA).build()));
    }

    @Test
    public void removeTable() throws Exception {
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i = 0; i < 5; ++i) {
            int id = 10 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        TestHelper.disableTableCdc(this.connection, "tableb");
        for (int i = 0; i < 5; ++i) {
            int id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b2')"});
        }
        records = this.consumeRecordsByTopic(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).isNullOrEmpty();
    }

    @Test
    public void addColumnToTableEndOfBatch() throws Exception {
        this.addColumnToTable(true);
    }

    @Test
    public void addColumnToTableMiddleOfBatch() throws Exception {
        this.addColumnToTable(false);
    }

    private void addColumnToTable(boolean pauseAfterCaptureChange) throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        int ID_START_3 = 1000;
        int ID_START_4 = 10000;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i2 = 0; i2 < 5; ++i2) {
            int id2 = 10 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build()));
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ADD newcol INT NOT NULL DEFAULT 0"});
        for (i = 0; i < 5; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b2', 2)"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build()));
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        if (pauseAfterCaptureChange) {
            Thread.sleep(5000L);
        }
        for (i = 0; i < 5; ++i) {
            id = 1000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b3', 3)"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).field("newcol", Schema.INT32_SCHEMA).build()));
        for (i = 0; i < 5; ++i) {
            id = 10000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a4')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b4', 4)"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).field("newcol", Schema.INT32_SCHEMA).build()));
    }

    @Test
    public void removeColumnFromTable() throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        int ID_START_3 = 1000;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i2 = 0; i2 < 5; ++i2) {
            int id2 = 10 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build()));
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb DROP COLUMN colb"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (i = 0; i < 5; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ")"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build()));
        for (i = 0; i < 5; ++i) {
            id = 1000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ")"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build()));
    }

    @Test
    public void readHistoryAfterRestart() throws Exception {
        int id;
        int i;
        boolean RECORDS_PER_TABLE = true;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        int ID_START_3 = 1000;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i2 = 0; i2 < 1; ++i2) {
            int id2 = 10 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(1);
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb DROP COLUMN colb"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (i = 0; i < 1; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ")"});
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(1);
        this.stopConnector();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        for (i = 0; i < 1; ++i) {
            id = 1000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ")"});
        }
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(1);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).build()));
        DocumentReader reader = DocumentReader.defaultReader();
        ArrayList changes = new ArrayList();
        IoUtil.readLines((Path)TestHelper.DB_HISTORY_PATH, line -> {
            try {
                changes.add(reader.read(line));
            }
            catch (IOException e) {
                throw new IllegalStateException(e);
            }
        });
        Assertions.assertThat(changes).hasSize(4);
        changes.subList(0, 3).forEach(change -> {
            Array changeArray = change.getArray((CharSequence)"tableChanges");
            Assertions.assertThat((int)changeArray.size()).isEqualTo(1);
            String type = changeArray.get(0).asDocument().getString((CharSequence)"type");
            Assertions.assertThat((String)type).isEqualTo((Object)"CREATE");
        });
        Array changeArray = ((Document)changes.get(3)).getArray((CharSequence)"tableChanges");
        Assertions.assertThat((int)changeArray.size()).isEqualTo(1);
        String type = changeArray.get(0).asDocument().getString((CharSequence)"type");
        String tableIid = changeArray.get(0).asDocument().getString((CharSequence)"id");
        Assertions.assertThat((String)type).isEqualTo((Object)"ALTER");
        Assertions.assertThat((String)tableIid).isEqualTo((Object)"\"testDB\".\"dbo\".\"tableb\"");
    }

    @Test
    public void renameColumn() throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        int ID_START_3 = 1000;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i2 = 0; i2 < 5; ++i2) {
            int id2 = 10 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", 'b')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build()));
        TestHelper.disableTableCdc(this.connection, "tableb");
        this.connection.execute(new String[]{"exec sp_rename 'tableb.colb', 'newcolb';"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (i = 0; i < 5; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb(id,newcolb) VALUES(" + id + ", 'b2')"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("newcolb", Schema.OPTIONAL_STRING_SCHEMA).build()));
        for (i = 0; i < 5; ++i) {
            id = 1000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", 'b3')"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("newcolb", Schema.OPTIONAL_STRING_SCHEMA).build()));
    }

    @Test
    public void changeColumn() throws Exception {
        int id;
        int i;
        int RECORDS_PER_TABLE = 5;
        int TABLES = 2;
        int ID_START_1 = 10;
        int ID_START_2 = 100;
        int ID_START_3 = 1000;
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(SqlServerConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)SqlServerConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(SqlServerConnector.class, config);
        this.assertConnectorIsRunning();
        TestHelper.waitForSnapshotToBeCompleted();
        for (int i2 = 0; i2 < 5; ++i2) {
            int id2 = 10 + i2;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id2 + ", 'a')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id2 + ", '" + id2 + "')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
            this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_STRING_SCHEMA).build());
            Struct value = ((Struct)record.value()).getStruct("after");
            int id = value.getInt32("id");
            String colb = value.getString("colb");
            Assertions.assertThat((String)Integer.toString(id)).isEqualTo((Object)colb);
        });
        this.connection.execute(new String[]{"ALTER TABLE dbo.tableb ALTER COLUMN colb INT"});
        TestHelper.enableTableCdc(this.connection, "tableb", "after_change");
        for (i = 0; i < 5; ++i) {
            id = 100 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a2')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", '" + id + " ')"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
            this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_INT32_SCHEMA).build());
            Struct value = ((Struct)record.value()).getStruct("after");
            int id = value.getInt32("id");
            int colb = value.getInt32("colb");
            Assertions.assertThat((int)id).isEqualTo(colb);
        });
        for (i = 0; i < 5; ++i) {
            id = 1000 + i;
            this.connection.execute(new String[]{"INSERT INTO tablea VALUES(" + id + ", 'a3')"});
            this.connection.execute(new String[]{"INSERT INTO tableb VALUES(" + id + ", '" + id + " ')"});
        }
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tablea")).hasSize(5);
        Assertions.assertThat((List)records.recordsForTopic("server1.dbo.tableb")).hasSize(5);
        records.recordsForTopic("server1.dbo.tableb").forEach(record -> {
            this.assertSchemaMatchesStruct((Struct)((Struct)record.value()).get("after"), SchemaBuilder.struct().optional().name("server1.dbo.tableb.Value").field("id", Schema.INT32_SCHEMA).field("colb", Schema.OPTIONAL_INT32_SCHEMA).build());
            Struct value = ((Struct)record.value()).getStruct("after");
            int id = value.getInt32("id");
            int colb = value.getInt32("colb");
            Assertions.assertThat((int)id).isEqualTo(colb);
        });
    }
}

