/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.antlr.listener.AlterTableParserListener;
import io.debezium.connector.oracle.antlr.listener.CreateTableParserListener;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
import io.debezium.relational.history.SchemaHistory;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class OracleSchemaMigrationIT
extends AbstractConnectorTest {
    private OracleConnection connection;

    @Before
    public void beforeEach() throws Exception {
        this.connection = TestHelper.testConnection();
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
        TestHelper.dropAllTables();
    }

    @After
    public void afterEach() throws Exception {
        if (this.connection != null) {
            TestHelper.dropAllTables();
            this.connection.close();
        }
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamNewlyCreatedNotFilteredTable() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(2);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEB");
        record = (SourceRecord)records.recordsForTopic("server1").get(1);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEB");
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEB")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldNotStreamNewlyCreatedTableDueToFilters() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.createTable("debezium.tableb", "CREATE TABLE debezium.tableb (ID numeric(9,0) primary key, data varchar2(50))");
        this.assertNoRecordsToConsume();
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (1, 'B')"});
        this.assertNoRecordsToConsume();
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'A')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        Assertions.assertThat((Object)((Struct)record.value()).getStruct("after").get("DATA")).isEqualTo((Object)"A");
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableAddColumnSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea ADD data2 numeric"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA,DATA2) values (2, 'Test2', 100)"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(3);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test2");
        Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)BigDecimal.valueOf(100L));
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableAddMultipleColumnsSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea ADD (data2 numeric, data3 varchar2(25))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA,DATA2,DATA3) values (2, 'Test2', 100, 'a')"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(4);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test2");
        Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)BigDecimal.valueOf(100L));
        Assertions.assertThat((Object)after.get("DATA3")).isEqualTo((Object)"a");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableRenameColumnSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME COLUMN data TO data1"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA1) values (2, 'Test2')"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.schema().field("DATA")).isNull();
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA1")).isEqualTo((Object)"Test2");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableDropColumnSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea DROP COLUMN data"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID) values (2)"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(1);
        Assertions.assertThat((Object)after.schema().field("DATA")).isNull();
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableDropMultipleColumnsSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data1 varchar2(50), data2 numeric)");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA1,DATA2) values (1, 'Test', 100)"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(3);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA1")).isEqualTo((Object)"Test");
        Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)BigDecimal.valueOf(100L));
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea DROP (data1, data2)"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID) values (2)"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(1);
        Assertions.assertThat((Object)after.schema().field("DATA1")).isNull();
        Assertions.assertThat((Object)after.schema().field("DATA2")).isNull();
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableRenameTableSchemaChange() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME TO tableb"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEB"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA,TABLEB");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEB");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEB")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEB");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test2");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldNotStreamAfterTableRenameToExcludedName() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.TABLEA")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.executeWithoutCommitting(new String[]{"ALTER TABLE debezium.tablea RENAME TO tableb"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tableb (ID,DATA) values (2, 'Test2')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA,TABLEB");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).isEmpty();
        this.assertNoRecordsToConsume();
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnDataType() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric)");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data varchar2(50))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnNullability() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50) not null)");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID,DATA) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        Schema dataSchema = after.schema().field("DATA").schema();
        Assertions.assertThat((boolean)dataSchema.isOptional()).isFalse();
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data varchar2(50) null)"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id) values (2)"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA")).isNull();
        dataSchema = after.schema().field("DATA").schema();
        Assertions.assertThat((boolean)dataSchema.isOptional()).isTrue();
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamAlterTableChangeColumnPrecisionAndScale() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data numeric(8,2) not null)");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (ID, DATA) values (1, 12345.67)"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)BigDecimal.valueOf(12345.67));
        Schema dataSchema = after.schema().field("DATA").schema();
        Assertions.assertThat((String)((String)dataSchema.parameters().get("scale"))).isEqualTo((Object)"2");
        Assertions.assertThat((String)((String)dataSchema.parameters().get("connect.decimal.precision"))).isEqualTo((Object)"8");
        this.connection.execute(new String[]{"ALTER TABLE debezium.tablea modify (data numeric(10,3))"});
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (2, 234567.891)"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "TABLEA");
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((List)after.schema().fields()).hasSize(2);
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)BigDecimal.valueOf(234567.891));
        dataSchema = after.schema().field("DATA").schema();
        Assertions.assertThat((String)((String)dataSchema.parameters().get("scale"))).isEqualTo((Object)"3");
        Assertions.assertThat((String)((String)dataSchema.parameters().get("connect.decimal.precision"))).isEqualTo((Object)"10");
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldStreamDropTable() throws Exception {
        this.createTable("debezium.tablea", "CREATE TABLE debezium.tablea (ID numeric(9,0) primary key, data varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "TABLEA");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO debezium.tablea (id, data) values (1, 'Test')"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA"))).hasSize(1);
        record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "TABLEA")).get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        Struct after = ((Struct)record.value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
        this.connection.execute(new String[]{"DROP TABLE debezium.tablea"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "TABLEA");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "DROP", "DEBEZIUM", "TABLEA");
        this.assertNoRecordsToConsume();
    }

    @Test
    @FixFor(value={"DBZ-2916"})
    public void shouldSnapshotAndStreamSchemaChangesUsingExplicitCasedNames() throws Exception {
        this.createTable("debezium.\"tableC\"", "CREATE TABLE debezium.\"tableC\" (\"id\" numeric(9,0) primary key, \"data\" varchar2(50))");
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        SourceRecord record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertSnapshotSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        List tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" add \"data2\" number(9,0)"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 2, "data2");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" add (\"data3\" number(9,0), \"data4\" varchar2(25))"});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 2, "data2");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 3, "data3");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 4, "data4");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" drop column \"data3\""});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 2, "data2");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 3, "data4");
        this.connection.execute(new String[]{"ALTER TABLE debezium.\"tableC\" rename column \"data4\" to \"Data3\""});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 2, "data2");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 3, "Data3");
        this.connection.execute(new String[]{"DROP TABLE debezium.\"tableC\""});
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.recordsForTopic("server1")).hasSize(1);
        record = (SourceRecord)records.recordsForTopic("server1").get(0);
        OracleSchemaMigrationIT.assertStreamingSchemaChange(record);
        OracleSchemaMigrationIT.assertSourceTableInfo(record, "DEBEZIUM", "tableC");
        tableChanges = ((Struct)record.value()).getArray("tableChanges");
        Assertions.assertThat((List)tableChanges).hasSize(1);
        OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "DROP", "DEBEZIUM", "tableC");
        OracleSchemaMigrationIT.assertTableChangePrimaryKeyNames((Struct)tableChanges.get(0), "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 0, "id");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 1, "data");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 2, "data2");
        OracleSchemaMigrationIT.assertTableChangeColumn((Struct)tableChanges.get(0), 3, "Data3");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="Test can be flaky and cannot reproduce locally, ignoring to stablize test suite")
    @FixFor(value={"DBZ-2916"})
    public void shouldNotEmitDdlEventsForNonTableObjects() throws Exception {
        try {
            LogInterceptor logminerlogInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            LogInterceptor errorLogInterceptor = new LogInterceptor(ErrorHandler.class);
            LogInterceptor xstreamLogInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
            TestHelper.grantRole("CREATE PROCEDURE");
            TestHelper.grantRole("ALTER ANY PROCEDURE");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            int expected = 7;
            this.connection.execute(new String[]{"CREATE OR REPLACE FUNCTION mytestf() return number is x number(11,2); begin return x; END;"});
            this.connection.execute(new String[]{"DROP FUNCTION mytestf"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PROCEDURE mytest() BEGIN select * from dual; END;"});
            this.connection.execute(new String[]{"DROP PROCEDURE mytest"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PACKAGE pkgtest as function hire return number; END;"});
            this.connection.execute(new String[]{"CREATE OR REPLACE PACKAGE BODY pkgtest as function hire return number; begin return 0; end;"});
            this.connection.execute(new String[]{"DROP PACKAGE pkgtest"});
            String logText = OracleConnectorConfig.ConnectorAdapter.LOG_MINER.equals((Object)TestHelper.adapter()) ? "DDL: " : "Processing DDL event ";
            Awaitility.await().atMost((long)TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> logminerlogInterceptor.countOccurrences(logText) == 7L || xstreamLogInterceptor.countOccurrences(logText) == 7L);
            this.stopConnector();
            OracleSchemaMigrationIT.waitForConnectorShutdown((String)"oracle", (String)"server1");
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)errorLogInterceptor.containsMessage("Producer failure")).as("Connector failure", new Object[0])).isFalse();
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.revokeRole("ALTER ANY PROCEDURE");
            TestHelper.revokeRole("CREATE PROCEDURE");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4037"})
    public void shouldParseSchemaChangeWithoutErrorOnFilteredTableWithRawDataType() throws Exception {
        LogInterceptor createTableinterceptor = new LogInterceptor(CreateTableParserListener.class);
        LogInterceptor alterTableinterceptor = new LogInterceptor(AlterTableParserListener.class);
        try {
            TestHelper.dropTable(this.connection, "dbz4037a");
            TestHelper.dropTable(this.connection, "dbz4037b");
            this.connection.execute(new String[]{"CREATE TABLE dbz4037a (id number(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4037a");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4037A")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            String ignoredTable = TestHelper.getDatabaseName() + ".DEBEZIUM.DBZ4037B";
            this.connection.execute(new String[]{"CREATE TABLE dbz4037b (id number(9,0), data raw(8), primary key(id))"});
            Awaitility.await().atMost((long)TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> createTableinterceptor.containsMessage(OracleSchemaMigrationIT.getIgnoreCreateTable(ignoredTable)));
            this.connection.execute(new String[]{"ALTER TABLE dbz4037b ADD data2 raw(10)"});
            Awaitility.await().atMost((long)TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> alterTableinterceptor.containsMessage(OracleSchemaMigrationIT.getIgnoreAlterTable(ignoredTable)));
            this.connection.execute(new String[]{"INSERT INTO dbz4037a (id,data) values (1, 'Test')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4037A"))).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4037A")).get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4037b");
            TestHelper.dropTable(this.connection, "dbz4037a");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4037"})
    public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception {
        try {
            TestHelper.dropTable(this.connection, "dbz4037");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4037")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            this.connection.execute(new String[]{"CREATE TABLE dbz4037 (id number(9,0), data raw(8), name varchar(50), primary key(id))"});
            TestHelper.streamTable(this.connection, "dbz4037");
            this.connection.execute(new String[]{"ALTER TABLE dbz4037 ADD data2 raw(10)"});
            this.connection.prepareUpdate("INSERT INTO dbz4037 (id,data,name,data2) values (1,?,'Acme 123',?)", preparer -> {
                preparer.setBytes(1, "Test".getBytes());
                preparer.setBytes(2, "T".getBytes());
            });
            this.connection.commit();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4037"))).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4037")).get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isNull();
            Assertions.assertThat((Object)after.get("DATA2")).isNull();
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Acme 123");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4037");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4782"})
    public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestart() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz4782");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.connection.execute(new String[]{"ALTER TABLE dbz4782 add data2 varchar2(50)"});
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(2);
            List records = sourceRecords.recordsForTopic("server1");
            Assertions.assertThat((List)records).hasSize(2);
            OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)records.get(0));
            List tableChanges = ((Struct)((SourceRecord)records.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
            OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)records.get(1));
            tableChanges = ((Struct)((SourceRecord)records.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
            this.stopConnector();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.waitForAvailableRecords(20L, TimeUnit.SECONDS);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4782");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4782"})
    public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestartWithFollowupDml() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782");
        try {
            this.createTable("dbz4782", "CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.connection.execute(new String[]{"ALTER TABLE dbz4782 add data2 varchar2(50)"});
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(2);
            List records = sourceRecords.recordsForTopic("server1");
            Assertions.assertThat((List)records).hasSize(2);
            OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)records.get(0));
            List tableChanges = ((Struct)((SourceRecord)records.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");
            OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)records.get(1));
            tableChanges = ((Struct)((SourceRecord)records.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");
            this.stopConnector();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.connection.execute(new String[]{"INSERT INTO dbz4782 values (1, 'data1', 'data2')"});
            sourceRecords = this.consumeRecordsByTopic(1);
            records = sourceRecords.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4782"));
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4782");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4782"})
    public void shouldNotResendSchemaChangeWithInprogressTransactionOnSecondTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4782a");
        TestHelper.dropTable(this.connection, "dbz4782b");
        try {
            this.createTable("dbz4782a", "CREATE TABLE dbz4782a (id numeric(9,0) primary key, data varchar2(50))");
            this.createTable("dbz4782b", "CREATE TABLE dbz4782b (id numeric(9,0) primary key, data varchar2(50))");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782[A|B]")).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            try (OracleConnection connection2 = TestHelper.testConnection();){
                connection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz4782b values (2, 'connection2')"});
                this.connection.execute(new String[]{"ALTER TABLE dbz4782a add data2 varchar2(50)"});
                AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(3);
                List records = sourceRecords.recordsForTopic("server1");
                Assertions.assertThat((List)records).hasSize(3);
                OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)records.get(0));
                List tableChanges = ((Struct)((SourceRecord)records.get(0)).value()).getArray("tableChanges");
                Assertions.assertThat((List)tableChanges).hasSize(1);
                OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782A");
                OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)records.get(1));
                tableChanges = ((Struct)((SourceRecord)records.get(1)).value()).getArray("tableChanges");
                Assertions.assertThat((List)tableChanges).hasSize(1);
                OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782B");
                OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)records.get(2));
                tableChanges = ((Struct)((SourceRecord)records.get(2)).value()).getArray("tableChanges");
                Assertions.assertThat((List)tableChanges).hasSize(1);
                OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782A");
                this.stopConnector();
                connection2.commit();
                this.start(OracleConnector.class, config);
                this.assertConnectorIsRunning();
                OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
                this.connection.execute(new String[]{"INSERT INTO dbz4782a values (1, 'data1', 'data2')"});
                sourceRecords = this.consumeRecordsByTopic(2);
                sourceRecords.allRecordsInOrder().forEach(System.out::println);
                records = sourceRecords.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4782A"));
                Assertions.assertThat((List)records).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
                records = sourceRecords.recordsForTopic(OracleSchemaMigrationIT.topicName("DEBEZIUM", "DBZ4782B"));
                Assertions.assertThat((List)records).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)2);
            }
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4782a");
            TestHelper.dropTable(this.connection, "dbz4782b");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5285"})
    public void shouldOnlyCaptureSchemaChangesForIncludedTables() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5285a");
        TestHelper.dropTable(this.connection, "dbz5285b");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz5285a (id numeric(9,0) primary key, data varchar2(50))"});
            this.connection.execute(new String[]{"CREATE TABLE dbz5285b (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5285a");
            TestHelper.streamTable(this.connection, "dbz5285b");
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data) values (1, 'A')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data) values (2, 'B')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5285A")).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true)).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            List schemaRecords = records.recordsForTopic("server1");
            Assertions.assertThat((List)schemaRecords).hasSize(1);
            OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)schemaRecords.get(0));
            List tableChanges = ((Struct)((SourceRecord)schemaRecords.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ5285A");
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
            Struct after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"A");
            this.assertNoRecordsToConsume();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.connection.execute(new String[]{"ALTER TABLE dbz5285a add data2 varchar2(50)"});
            this.connection.execute(new String[]{"ALTER TABLE dbz5285b add data2 varchar2(50)"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data,data2) values (3, 'A3', 'D1')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data,data2) values (4, 'B4', 'D2')"});
            records = this.consumeRecordsByTopic(2);
            schemaRecords = records.recordsForTopic("server1");
            Assertions.assertThat((List)schemaRecords).hasSize(1);
            OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)schemaRecords.get(0));
            tableChanges = ((Struct)((SourceRecord)schemaRecords.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ5285A");
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)3);
            after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"A3");
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)"D1");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5285"})
    public void shouldCaptureSchemaChangesForAllTablesRegardlessOfIncludeList() throws Exception {
        TestHelper.dropTable(this.connection, "dbz5285a");
        TestHelper.dropTable(this.connection, "dbz5285b");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz5285a (id numeric(9,0) primary key, data varchar2(50))"});
            this.connection.execute(new String[]{"CREATE TABLE dbz5285b (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(this.connection, "dbz5285a");
            TestHelper.streamTable(this.connection, "dbz5285b");
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data) values (1, 'A')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data) values (2, 'B')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5285A")).with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, false)).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleSchemaMigrationIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(3);
            List schemaRecords = records.recordsForTopic("server1");
            Assertions.assertThat((List)schemaRecords).hasSize(2);
            OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)schemaRecords.get(0));
            List tableChanges = ((Struct)((SourceRecord)schemaRecords.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ5285A");
            OracleSchemaMigrationIT.assertSnapshotSchemaChange((SourceRecord)schemaRecords.get(1));
            tableChanges = ((Struct)((SourceRecord)schemaRecords.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ5285B");
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
            Struct after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"A");
            this.assertNoRecordsToConsume();
            OracleSchemaMigrationIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.connection.execute(new String[]{"ALTER TABLE dbz5285a add data2 varchar2(50)"});
            this.connection.execute(new String[]{"ALTER TABLE dbz5285b add data2 varchar2(50)"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285a (id,data,data2) values (3, 'A3', 'D1')"});
            this.connection.execute(new String[]{"INSERT INTO dbz5285b (id,data,data2) values (4, 'B4', 'D2')"});
            records = this.consumeRecordsByTopic(3);
            schemaRecords = records.recordsForTopic("server1");
            schemaRecords.forEach(System.out::println);
            Assertions.assertThat((List)schemaRecords).hasSize(2);
            OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)schemaRecords.get(0));
            tableChanges = ((Struct)((SourceRecord)schemaRecords.get(0)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).hasSize(1);
            OracleSchemaMigrationIT.assertTableChange((Struct)tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ5285A");
            OracleSchemaMigrationIT.assertStreamingSchemaChange((SourceRecord)schemaRecords.get(1));
            tableChanges = ((Struct)((SourceRecord)schemaRecords.get(1)).value()).getArray("tableChanges");
            Assertions.assertThat((List)tableChanges).isEmpty();
            OracleSchemaMigrationIT.assertSourceTableInfo((SourceRecord)schemaRecords.get(1), "DEBEZIUM", "DBZ5285B");
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ5285A");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)3);
            after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"A3");
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)"D1");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz5285b");
            TestHelper.dropTable(this.connection, "dbz5285a");
        }
    }

    private static String getTableIdString(String schemaName, String tableName) {
        return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString();
    }

    private void createTable(String tableName, String sql) throws SQLException {
        this.connection.execute(new String[]{sql});
        TestHelper.streamTable(this.connection, tableName);
    }

    private static void assertSnapshotSchemaChange(SourceRecord record) {
        Assertions.assertThat((String)record.topic()).isEqualTo((Object)"server1");
        Assertions.assertThat((String)((Struct)record.key()).getString("databaseName")).isEqualTo((Object)TestHelper.getDatabaseName());
        Assertions.assertThat(record.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat((String)((Struct)record.value()).getStruct("source").getString("snapshot")).isEqualTo((Object)"true");
    }

    private static void assertStreamingSchemaChange(SourceRecord record) {
        Assertions.assertThat((String)record.topic()).isEqualTo((Object)"server1");
        Assertions.assertThat((String)((Struct)record.key()).getString("databaseName")).isEqualTo((Object)TestHelper.getDatabaseName());
        Assertions.assertThat(record.sourceOffset().get("snapshot")).isNull();
        Assertions.assertThat((String)((Struct)record.value()).getStruct("source").getString("snapshot")).isNull();
    }

    private static void assertTableChange(Struct change, String type, String schema, String table) {
        Assertions.assertThat((Object)change.get("type")).isEqualTo((Object)type);
        Assertions.assertThat((Object)change.get("id")).isEqualTo((Object)OracleSchemaMigrationIT.getTableIdString(schema, table));
    }

    private static void assertTableChangePrimaryKeyNames(Struct change, String ... names) {
        Assertions.assertThat((List)change.getStruct("table").getArray("primaryKeyColumnNames")).isEqualTo(Arrays.asList(names));
    }

    private static void assertTableChangeColumn(Struct change, int index, String columnName) {
        List columns = change.getStruct("table").getArray("columns");
        Assertions.assertThat((int)columns.size()).isGreaterThan(index);
        Struct column = (Struct)columns.get(index);
        Assertions.assertThat((Object)column.get("name")).isEqualTo((Object)columnName);
    }

    private static void assertSourceTableInfo(SourceRecord record, String schema, String table) {
        Struct source = ((Struct)record.value()).getStruct("source");
        Assertions.assertThat((Object)source.get("db")).isEqualTo((Object)TestHelper.getDatabaseName());
        Assertions.assertThat((Object)source.get("schema")).isEqualTo((Object)schema);
        Assertions.assertThat((Object)source.get("table")).isEqualTo((Object)table);
    }

    private static String topicName(String schema, String table) {
        return "server1." + schema + "." + table;
    }

    private static String getIgnoreCreateTable(String tableName) {
        return "Ignoring CREATE TABLE statement for non-captured table " + tableName;
    }

    private static String getIgnoreAlterTable(String tableName) {
        return "Ignoring ALTER TABLE statement for non-captured table " + tableName;
    }
}

