/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleConnectorTask;
import io.debezium.connector.oracle.OracleDatabaseVersion;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.junit.SkipOnDatabaseOption;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.LogMinerAdapter;
import io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource;
import io.debezium.connector.oracle.logminer.processor.AbstractLogMinerEventProcessor;
import io.debezium.connector.oracle.logminer.processor.memory.MemoryLogMinerEventProcessor;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.data.SchemaAndValueField;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.history.MemorySchemaHistory;
import io.debezium.storage.file.history.FileSchemaHistory;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OracleConnectorIT
extends AbstractConnectorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnectorIT.class);
    private static final long MICROS_PER_SECOND = TimeUnit.SECONDS.toMicros(1L);
    private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed";
    private static final String ERROR_PROCESSING_FAIL_MESSAGE = "Oracle LogMiner is unable to re-construct the SQL for '";
    private static final String ERROR_PROCESSING_WARN_MESSAGE = "cannot be parsed. This event will be ignored and skipped.";
    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "debezium.customer");
        TestHelper.dropTable(connection, "debezium.masked_hashed_column_table");
        TestHelper.dropTable(connection, "debezium.truncated_column_table");
        TestHelper.dropTable(connection, "debezium.dt_table");
        String ddl = "create table debezium.customer (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))";
        connection.execute(new String[]{ddl});
        connection.execute(new String[]{"GRANT SELECT ON debezium.customer to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.customer ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String ddl2 = "create table debezium.masked_hashed_column_table (  id numeric(9,0) not null,   name varchar2(255),   name2 varchar2(255),   name3 varchar2(20),  primary key (id))";
        connection.execute(new String[]{ddl2});
        connection.execute(new String[]{"GRANT SELECT ON debezium.masked_hashed_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.masked_hashed_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String ddl3 = "create table debezium.truncated_column_table (  id numeric(9,0) not null,   name varchar2(20),   primary key (id))";
        connection.execute(new String[]{ddl3});
        connection.execute(new String[]{"GRANT SELECT ON debezium.truncated_column_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.truncated_column_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String ddl4 = "create table dt_table (  id numeric(9,0) not null,   c1 int,   c2 int,   c3a numeric(5,2),   c3b varchar(128),   f1 float(10),   f2 decimal(8,4),   primary key (id))";
        connection.execute(new String[]{ddl4});
        connection.execute(new String[]{"GRANT SELECT ON debezium.dt_table to  " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.dt_table ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            TestHelper.dropTable(connection, "debezium.customer2");
            TestHelper.dropTable(connection, "customer");
            TestHelper.dropTable(connection, "masked_hashed_column_table");
            TestHelper.dropTable(connection, "truncated_column_table");
            TestHelper.dropTable(connection, "dt_table");
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        connection.execute(new String[]{"delete from debezium.customer"});
        connection.execute(new String[]{"delete from debezium.masked_hashed_column_table"});
        connection.execute(new String[]{"delete from debezium.truncated_column_table"});
        connection.execute(new String[]{"delete from debezium.dt_table"});
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2452"})
    public void shouldSnapshotAndStreamWithHyphenedTableName() throws Exception {
        TestHelper.dropTable(connection, "debezium.\"my-table\"");
        try {
            String ddl = "create table \"my-table\" ( id numeric(9,0) not null,  c1 int,  c2 varchar(128),  primary key (id))";
            connection.execute(new String[]{ddl});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"my-table\" to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"my-table\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (1, 25, 'Test')"});
            connection.execute(new String[]{"COMMIT"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.MY-TABLE")).with(OracleConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, "avro")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.\"my-table\" VALUES (2, 50, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            List hyphenatedTableRecords = records.recordsForTopic("server1.DEBEZIUM.my-table");
            Assertions.assertThat((List)hyphenatedTableRecords).hasSize(2);
            SourceRecord record1 = (SourceRecord)hyphenatedTableRecords.get(0);
            VerifyRecord.isValidRead((SourceRecord)record1, (String)"ID", (int)1);
            Struct after1 = (Struct)((Struct)record1.value()).get("after");
            Assertions.assertThat((Object)after1.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after1.get("C1")).isEqualTo((Object)BigDecimal.valueOf(25L));
            Assertions.assertThat((Object)after1.get("C2")).isEqualTo((Object)"Test");
            Assertions.assertThat(record1.sourceOffset().get("snapshot")).isEqualTo((Object)true);
            Assertions.assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)true);
            SourceRecord record2 = (SourceRecord)hyphenatedTableRecords.get(1);
            VerifyRecord.isValidInsert((SourceRecord)record2, (String)"ID", (int)2);
            Struct after2 = (Struct)((Struct)record2.value()).get("after");
            Assertions.assertThat((Object)after2.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after2.get("C1")).isEqualTo((Object)BigDecimal.valueOf(50L));
            Assertions.assertThat((Object)after2.get("C2")).isEqualTo((Object)"Test2");
        }
        finally {
            TestHelper.dropTable(connection, "debezium.\"my-table\"");
        }
    }

    @Test
    public void shouldTakeSnapshot() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        SourceRecord record1 = (SourceRecord)testTableRecords.get(0);
        VerifyRecord.isValidRead((SourceRecord)record1, (String)"ID", (int)1);
        Struct after = (Struct)((Struct)record1.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Billie-Bob");
        Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(1234.56));
        Assertions.assertThat((Object)after.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
        Assertions.assertThat(record1.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)false);
        Struct source = (Struct)((Struct)record1.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"first");
        SourceRecord record2 = (SourceRecord)testTableRecords.get(1);
        VerifyRecord.isValidRead((SourceRecord)record2, (String)"ID", (int)2);
        after = (Struct)((Struct)record2.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Bruce");
        Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(2345.67));
        Assertions.assertThat((Object)after.get("REGISTERED")).isNull();
        Assertions.assertThat(record2.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)true);
        source = (Struct)((Struct)record2.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"last");
    }

    @Test
    @Ignore(value="Requires database to be configured without ARCHIVELOG_MODE enabled; which conflicts with dbz-oracle images")
    @FixFor(value={"DBZ-6276"})
    public void shouldSkipCheckingArchiveLogIfNoCdc() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL_ONLY)).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.SKIP)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        LogInterceptor logInterceptor = new LogInterceptor(OracleConnectorTask.class);
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        this.stopConnector();
        Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("Failed the archive log check but continuing as redo log isn't strictly required")).isTrue();
    }

    @Test
    public void shouldContinueWithStreamingAfterSnapshot() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        this.continueStreamingAfterSnapshot(config);
    }

    private void continueStreamingAfterSnapshot(Configuration config) throws Exception {
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        SourceRecord record1 = (SourceRecord)testTableRecords.get(0);
        VerifyRecord.isValidRead((SourceRecord)record1, (String)"ID", (int)1);
        Struct after = (Struct)((Struct)record1.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Struct source = (Struct)((Struct)record1.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"first");
        Assertions.assertThat((Object)source.get("scn")).isNotNull();
        Assertions.assertThat((Object)source.get("name")).isEqualTo((Object)"server1");
        Assertions.assertThat((Object)source.get("version")).isNotNull();
        Assertions.assertThat((Object)source.get("txId")).isNull();
        Assertions.assertThat((Object)source.get("ts_ms")).isNotNull();
        Assertions.assertThat(record1.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)false);
        SourceRecord record2 = (SourceRecord)testTableRecords.get(1);
        VerifyRecord.isValidRead((SourceRecord)record2, (String)"ID", (int)2);
        after = (Struct)((Struct)record2.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat(record2.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)true);
        source = (Struct)((Struct)record2.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"last");
        expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Brian', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        records = this.consumeRecordsByTopic(++expectedRecordCount);
        testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        SourceRecord record3 = (SourceRecord)testTableRecords.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record3, (String)"ID", (int)3);
        after = (Struct)((Struct)record3.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
        Assertions.assertThat((boolean)record3.sourceOffset().containsKey("snapshot")).isFalse();
        Assertions.assertThat((boolean)record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
        source = (Struct)((Struct)record3.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"false");
        Assertions.assertThat((Object)source.get("scn")).isNotNull();
        Assertions.assertThat((Object)source.get("name")).isEqualTo((Object)"server1");
        Assertions.assertThat((Object)source.get("version")).isNotNull();
        Assertions.assertThat((Object)source.get("txId")).isNotNull();
        Assertions.assertThat((Object)source.get("ts_ms")).isNotNull();
    }

    @Test
    @FixFor(value={"DBZ-1223"})
    public void shouldStreamTransaction() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        SourceRecord record1 = (SourceRecord)testTableRecords.get(0);
        VerifyRecord.isValidRead((SourceRecord)record1, (String)"ID", (int)1);
        Struct after = (Struct)((Struct)record1.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Struct source = (Struct)((Struct)record1.value()).get("source");
        Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"first");
        Assertions.assertThat((Object)source.get("scn")).isNotNull();
        Assertions.assertThat((Object)source.get("name")).isEqualTo((Object)"server1");
        Assertions.assertThat((Object)source.get("version")).isNotNull();
        Assertions.assertThat((Object)source.get("txId")).isNull();
        Assertions.assertThat((Object)source.get("ts_ms")).isNotNull();
        Assertions.assertThat(record1.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record1.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)false);
        SourceRecord record2 = (SourceRecord)testTableRecords.get(1);
        VerifyRecord.isValidRead((SourceRecord)record2, (String)"ID", (int)2);
        after = (Struct)((Struct)record2.value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat(record2.sourceOffset().get("snapshot")).isEqualTo((Object)true);
        Assertions.assertThat(record2.sourceOffset().get(SNAPSHOT_COMPLETED_KEY)).isEqualTo((Object)true);
        expectedRecordCount = 30;
        connection.setAutoCommit(false);
        this.sendTxBatch(config, expectedRecordCount, 100);
        this.sendTxBatch(config, expectedRecordCount, 200);
    }

    private void sendTxBatch(Configuration config, int expectedRecordCount, int offset) throws SQLException, InterruptedException {
        boolean isAutoCommit = false;
        if (connection.connection().getAutoCommit()) {
            isAutoCommit = true;
            connection.connection().setAutoCommit(false);
        }
        for (int i = offset; i < expectedRecordCount + offset; ++i) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", i, i)});
        }
        connection.connection().commit();
        if (isAutoCommit) {
            connection.connection().setAutoCommit(true);
        }
        this.assertTxBatch(config, expectedRecordCount, offset);
    }

    private void assertTxBatch(Configuration config, int expectedRecordCount, int offset) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        OracleConnectorConfig.ConnectorAdapter adapter = TestHelper.getAdapter(config);
        for (int i = 0; i < expectedRecordCount; ++i) {
            SourceRecord record3 = (SourceRecord)testTableRecords.get(i);
            VerifyRecord.isValidInsert((SourceRecord)record3, (String)"ID", (int)(i + offset));
            Struct after = (Struct)((Struct)record3.value()).get("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)(i + offset));
            Assertions.assertThat((boolean)record3.sourceOffset().containsKey("snapshot")).isFalse();
            Assertions.assertThat((boolean)record3.sourceOffset().containsKey(SNAPSHOT_COMPLETED_KEY)).isFalse();
            if (OracleConnectorConfig.ConnectorAdapter.LOG_MINER != adapter) {
                Assertions.assertThat((boolean)record3.sourceOffset().containsKey("lcr_position")).isTrue();
                Assertions.assertThat((boolean)record3.sourceOffset().containsKey("scn")).isFalse();
            }
            Struct source = (Struct)((Struct)record3.value()).get("source");
            Assertions.assertThat((Object)source.get("snapshot")).isEqualTo((Object)"false");
            Assertions.assertThat((Object)source.get("scn")).isNotNull();
            if (OracleConnectorConfig.ConnectorAdapter.LOG_MINER != adapter) {
                Assertions.assertThat((Object)source.get("lcr_position")).isNotNull();
            }
            Assertions.assertThat((Object)source.get("name")).isEqualTo((Object)"server1");
            Assertions.assertThat((Object)source.get("version")).isNotNull();
            Assertions.assertThat((Object)source.get("txId")).isNotNull();
            Assertions.assertThat((Object)source.get("ts_ms")).isNotNull();
        }
    }

    @Test
    public void shouldStreamAfterRestart() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        expectedRecordCount = 30;
        connection.setAutoCommit(false);
        this.sendTxBatch(config, expectedRecordCount, 100);
        this.sendTxBatch(config, expectedRecordCount, 200);
        this.stopConnector();
        int OFFSET = 300;
        for (int i = 300; i < expectedRecordCount + 300; ++i) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", i, i)});
        }
        connection.connection().commit();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.assertTxBatch(config, expectedRecordCount, 300);
        this.sendTxBatch(config, expectedRecordCount, 400);
        this.sendTxBatch(config, expectedRecordCount, 500);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldStreamAfterRestartAfterSnapshot() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Bruce', 2345.67, null)"});
        connection.execute(new String[]{"COMMIT"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        this.stopConnector();
        connection.setAutoCommit(false);
        int OFFSET = 100;
        for (int i = 100; i < expectedRecordCount + 100; ++i) {
            connection.executeWithoutCommitting(new String[]{String.format("INSERT INTO debezium.customer VALUES (%s, 'Brian%s', 2345.67, null)", i, i)});
        }
        connection.connection().commit();
        try {
            connection.setAutoCommit(true);
            Testing.print((Object)"=== Starting connector second time ===");
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            this.assertTxBatch(config, expectedRecordCount, 100);
            this.sendTxBatch(config, expectedRecordCount, 200);
        }
        finally {
            connection.setAutoCommit(false);
        }
    }

    @Test
    public void shouldReadChangeStreamForExistingTable() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        ++expectedRecordCount;
        connection.execute(new String[]{"UPDATE debezium.customer SET name = 'Bruce', score = 2345.67, registered = TO_DATE('2018-03-23', 'yyyy-mm-dd') WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        ++expectedRecordCount;
        connection.execute(new String[]{"UPDATE debezium.customer SET id = 2 WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        expectedRecordCount += 3;
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 2"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecordCount += 2);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)testTableRecords.get(0)), (String)"ID", (int)1);
        Struct after = (Struct)((Struct)((SourceRecord)testTableRecords.get(0)).value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Billie-Bob");
        Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(1234.56));
        Assertions.assertThat((Object)after.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
        Map offset = ((SourceRecord)testTableRecords.get(0)).sourceOffset();
        Assertions.assertThat(offset.get("snapshot")).isNull();
        Assertions.assertThat(offset.get(SNAPSHOT_COMPLETED_KEY)).isNull();
        VerifyRecord.isValidUpdate((SourceRecord)((SourceRecord)testTableRecords.get(1)), (String)"ID", (int)1);
        Struct before = (Struct)((Struct)((SourceRecord)testTableRecords.get(1)).value()).get("before");
        Assertions.assertThat((Object)before.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)before.get("NAME")).isEqualTo((Object)"Billie-Bob");
        Assertions.assertThat((Object)before.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(1234.56));
        Assertions.assertThat((Object)before.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
        after = (Struct)((Struct)((SourceRecord)testTableRecords.get(1)).value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Bruce");
        Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(2345.67));
        Assertions.assertThat((Object)after.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0)));
        VerifyRecord.isValidDelete((SourceRecord)((SourceRecord)testTableRecords.get(2)), (String)"ID", (int)1);
        before = (Struct)((Struct)((SourceRecord)testTableRecords.get(2)).value()).get("before");
        Assertions.assertThat((Object)before.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)before.get("NAME")).isEqualTo((Object)"Bruce");
        Assertions.assertThat((Object)before.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(2345.67));
        Assertions.assertThat((Object)before.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0)));
        VerifyRecord.isValidTombstone((SourceRecord)((SourceRecord)testTableRecords.get(3)));
        VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)testTableRecords.get(4)), (String)"ID", (int)2);
        after = (Struct)((Struct)((SourceRecord)testTableRecords.get(4)).value()).get("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Bruce");
        Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(2345.67));
        Assertions.assertThat((Object)after.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0)));
        VerifyRecord.isValidDelete((SourceRecord)((SourceRecord)testTableRecords.get(5)), (String)"ID", (int)2);
        before = (Struct)((Struct)((SourceRecord)testTableRecords.get(5)).value()).get("before");
        Assertions.assertThat((Object)before.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)before.get("NAME")).isEqualTo((Object)"Bruce");
        Assertions.assertThat((Object)before.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(2345.67));
        Assertions.assertThat((Object)before.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 3, 23, 0, 0, 0)));
        VerifyRecord.isValidTombstone((SourceRecord)((SourceRecord)testTableRecords.get(6)));
    }

    @Test
    @FixFor(value={"DBZ-835"})
    public void deleteWithoutTombstone() throws Exception {
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        int expectedRecordCount = 0;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (1, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        ++expectedRecordCount;
        connection.execute(new String[]{"DELETE debezium.customer WHERE id = 1"});
        connection.execute(new String[]{"COMMIT"});
        ++expectedRecordCount;
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(++expectedRecordCount);
        List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)testTableRecords).hasSize(expectedRecordCount);
        VerifyRecord.isValidDelete((SourceRecord)((SourceRecord)testTableRecords.get(1)), (String)"ID", (int)1);
        Struct before = ((Struct)((SourceRecord)testTableRecords.get(1)).value()).getStruct("before");
        Assertions.assertThat((Object)before.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)before.get("NAME")).isEqualTo((Object)"Billie-Bob");
        Assertions.assertThat((Object)before.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(1234.56));
        Assertions.assertThat((Object)before.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
        VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)testTableRecords.get(2)), (String)"ID", (int)2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldReadChangeStreamForTableCreatedWhileStreaming() throws Exception {
        TestHelper.dropTable(connection, "debezium.customer2");
        try {
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER2")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            String ddl = "create table debezium.customer2 (  id numeric(9,0) not null,   name varchar2(1000),   score decimal(6, 2),   registered timestamp,   primary key (id))";
            connection.execute(new String[]{ddl});
            TestHelper.streamTable(connection, "debezium.customer2");
            connection.execute(new String[]{"INSERT INTO debezium.customer2 VALUES (2, 'Billie-Bob', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List testTableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER2");
            Assertions.assertThat((List)testTableRecords).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)testTableRecords.get(0)), (String)"ID", (int)2);
            Struct after = (Struct)((Struct)((SourceRecord)testTableRecords.get(0)).value()).get("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Billie-Bob");
            Assertions.assertThat((Object)after.get("SCORE")).isEqualTo((Object)BigDecimal.valueOf(1234.56));
            Assertions.assertThat((Object)after.get("REGISTERED")).isEqualTo((Object)this.toMicroSecondsSinceEpoch(LocalDateTime.of(2018, 2, 22, 0, 0, 0)));
        }
        finally {
            TestHelper.dropTable(connection, "debezium.customer2");
        }
    }

    @Test
    @FixFor(value={"DBZ-800"})
    public void shouldReceiveHeartbeatAlsoWhenChangingTableIncludeListTables() throws Exception {
        TestHelper.dropTable(connection, "debezium.dbz800a");
        TestHelper.dropTable(connection, "debezium.dbz800b");
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(Heartbeat.HEARTBEAT_INTERVAL, "1")).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ800B")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800a (id NUMBER(9) NOT NULL, aaa VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"CREATE TABLE debezium.dbz800b (id NUMBER(9) NOT NULL, bbb VARCHAR2(100), PRIMARY KEY (id) )"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800a VALUES (1, 'AAA')"});
        connection.execute(new String[]{"INSERT INTO debezium.dbz800b VALUES (2, 'BBB')"});
        connection.execute(new String[]{"COMMIT"});
        AtomicReference records = new AtomicReference();
        Awaitility.await().atMost(Duration.ofSeconds(60L)).until(() -> {
            if (records.get() == null) {
                records.set(this.consumeRecordsByTopic(1));
            } else {
                this.consumeRecordsByTopic(1).allRecordsInOrder().forEach(arg_0 -> ((AbstractConnectorTest.SourceRecords)((AbstractConnectorTest.SourceRecords)records.get())).add(arg_0));
            }
            return ((AbstractConnectorTest.SourceRecords)records.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B") != null;
        });
        List heartbeats = ((AbstractConnectorTest.SourceRecords)records.get()).recordsForTopic("__debezium-heartbeat.server1");
        List tableA = ((AbstractConnectorTest.SourceRecords)records.get()).recordsForTopic("server1.DEBEZIUM.DBZ800A");
        List tableB = ((AbstractConnectorTest.SourceRecords)records.get()).recordsForTopic("server1.DEBEZIUM.DBZ800B");
        Assertions.assertThat((List)heartbeats).isNotEmpty();
        Assertions.assertThat((List)tableA).isNull();
        Assertions.assertThat((List)tableB).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableB.get(0)), (String)"ID", (int)2);
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithDatabaseName() throws Exception {
        this.shouldConsumeEventsWithMaskedAndTruncatedColumns(true);
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldConsumeEventsWithMaskedAndTruncatedColumnsWithoutDatabaseName() throws Exception {
        this.shouldConsumeEventsWithMaskedAndTruncatedColumns(false);
    }

    public void shouldConsumeEventsWithMaskedAndTruncatedColumns(boolean useDatabaseName) throws Exception {
        Configuration config;
        if (useDatabaseName) {
            String dbName = TestHelper.getDatabaseName();
            config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("column.mask.with.12.chars", dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2," + dbName + ".DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", dbName + ".DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        } else {
            config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("column.mask.with.12.chars", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME").with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME2,DEBEZIUM.MASKED_HASHED_COLUMN_TABLE.NAME3").with("column.truncate.to.4.chars", "DEBEZIUM.TRUNCATED_COLUMN_TABLE.NAME").build();
        }
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        connection.execute(new String[]{"INSERT INTO debezium.masked_hashed_column_table (id, name, name2, name3) VALUES (10, 'some_name', 'test', 'test')"});
        connection.execute(new String[]{"INSERT INTO debezium.truncated_column_table VALUES(11, 'some_name')"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        List tableA = records.recordsForTopic("server1.DEBEZIUM.MASKED_HASHED_COLUMN_TABLE");
        List tableB = records.recordsForTopic("server1.DEBEZIUM.TRUNCATED_COLUMN_TABLE");
        Assertions.assertThat((List)tableA).hasSize(1);
        SourceRecord record = (SourceRecord)tableA.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)10);
        Struct value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Struct after = value.getStruct("after");
            Assertions.assertThat((String)after.getString("NAME")).isEqualTo((Object)"************");
            Assertions.assertThat((String)after.getString("NAME2")).isEqualTo((Object)"8e68c68edbbac316dfe2f6ada6b0d2d3e2002b487a985d4b7c7c82dd83b0f4d7");
            Assertions.assertThat((String)after.getString("NAME3")).isEqualTo((Object)"8e68c68edbbac316dfe2");
        }
        Assertions.assertThat((List)tableB).hasSize(1);
        record = (SourceRecord)tableB.get(0);
        VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)11);
        value = (Struct)record.value();
        if (value.getStruct("after") != null) {
            Assertions.assertThat((String)value.getStruct("after").getString("NAME")).isEqualTo((Object)"some");
        }
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldRewriteIdentityKeyWithDatabaseName() throws Exception {
        this.shouldRewriteIdentityKey(true);
    }

    @Test
    @FixFor(value={"DBZ-775"})
    public void shouldRewriteIdentityKeyWithoutDatabaseName() throws Exception {
        this.shouldRewriteIdentityKey(false);
    }

    private void shouldRewriteIdentityKey(boolean useDatabaseName) throws Exception {
        Configuration config = useDatabaseName ? ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).debezium.customer:id,name")).build() : ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "debezium.customer:id,name")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        connection.execute(new String[]{"INSERT INTO debezium.customer VALUES (3, 'Nest', 1234.56, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((Object)((SourceRecord)recordsForTopic.get(0)).key()).isNotNull();
        Struct key = (Struct)((SourceRecord)recordsForTopic.get(0)).key();
        Assertions.assertThat((Object)key.get("ID")).isNotNull();
        Assertions.assertThat((Object)key.get("NAME")).isNotNull();
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with("datatype.propagate.source.type", ".+\\.NUMBER,.+\\.VARCHAR2,.+\\.FLOAT").build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        connection.execute(new String[]{"INSERT INTO debezium.dt_table (id,c1,c2,c3a,c3b,f1,f2) values (1,123,456,789.01,'test',1.228,234.56)"});
        connection.execute(new String[]{"COMMIT"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DT_TABLE");
        Assertions.assertThat((List)recordsForTopic).hasSize(1);
        Field before = ((SourceRecord)recordsForTopic.get(0)).valueSchema().field("before");
        Assertions.assertThat((Map)before.schema().field("ID").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"NUMBER"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"9"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"0")});
        Assertions.assertThat((Map)before.schema().field("C1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"NUMBER"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"38"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"0")});
        Assertions.assertThat((Map)before.schema().field("C2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"NUMBER"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"38"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"0")});
        Assertions.assertThat((Map)before.schema().field("C3A").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"NUMBER"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"5"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"2")});
        Assertions.assertThat((Map)before.schema().field("C3B").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"VARCHAR2"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"128")});
        Assertions.assertThat((Map)before.schema().field("F2").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"NUMBER"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"8"), Assertions.entry((Object)"__debezium.source.column.scale", (Object)"4")});
        Assertions.assertThat((Map)before.schema().field("F1").schema().parameters()).contains(new Map.Entry[]{Assertions.entry((Object)"__debezium.source.column.type", (Object)"FLOAT"), Assertions.entry((Object)"__debezium.source.column.length", (Object)"10")});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4385"})
    public void shouldTruncate() throws Exception {
        TestHelper.dropTable(connection, "debezium.truncate_ddl");
        try {
            String ddl = "create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))";
            connection.execute(new String[]{"create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))"});
            TestHelper.streamTable(connection, "debezium.truncate_ddl");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TRUNCATE_DDL")).with(OracleConnectorConfig.SKIPPED_OPERATIONS, "none")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)snapshotRecords.recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL")).hasSize(1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"TRUNCATE TABLE debezium.truncate_ddl"});
            AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
            List records = streamingRecords.recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat((List)records).hasSize(1);
            String op = ((Struct)((SourceRecord)records.get(0)).value()).getString("op");
            Assertions.assertThat((String)op).isEqualTo((Object)"t");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (2, 'Roadrunner')"});
            connection.commit();
            streamingRecords = this.consumeRecordsByTopic(1);
            records = streamingRecords.recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat((List)records).hasSize(1);
            op = ((Struct)((SourceRecord)records.get(0)).value()).getString("op");
            Assertions.assertThat((String)op).isEqualTo((Object)"c");
        }
        finally {
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4385"})
    public void shouldNotTruncateWhenSkipped() throws Exception {
        TestHelper.dropTable(connection, "debezium.truncate_ddl");
        try {
            String ddl = "create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))";
            connection.execute(new String[]{"create table debezium.truncate_ddl (id NUMERIC(6), name VARCHAR(100), primary key(id))"});
            TestHelper.streamTable(connection, "debezium.truncate_ddl");
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TRUNCATE_DDL")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)snapshotRecords.recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL")).hasSize(1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"TRUNCATE TABLE debezium.truncate_ddl"});
            connection.execute(new String[]{"INSERT INTO debezium.truncate_ddl (id, name) values (2, 'Roadrunner')"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
            List records = streamingRecords.recordsForTopic("server1.DEBEZIUM.TRUNCATE_DDL");
            Assertions.assertThat((List)records).hasSize(1);
            String op = ((Struct)((SourceRecord)records.get(0)).value()).getString("op");
            Assertions.assertThat((String)op).isEqualTo((Object)"c");
        }
        finally {
            TestHelper.dropTable(connection, "debezium.truncate_ddl");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @FixFor(value={"DBZ-1539"})
    public void shouldHandleIntervalTypesAsInt64() throws Exception {
        TestHelper.dropTable(connection, "debezium.interval");
        try {
            String ddl = "create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))";
            connection.execute(new String[]{"create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))"});
            TestHelper.streamTable(connection, "debezium.interval");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)snapshotRecords.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat((Iterable)snapshotRecords.topics()).contains((Object[])new String[]{"server1.DEBEZIUM.INTERVAL"});
            List records = snapshotRecords.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat((List)records).hasSize(2);
            Struct after = ((Struct)((SourceRecord)records.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Long)after.getInt64("INTYM")).isEqualTo(63115200000000L);
            Assertions.assertThat((Long)after.getInt64("INTYM2")).isEqualTo(17524987200000000L);
            Assertions.assertThat((Long)after.getInt64("INTDS")).isEqualTo(259200000000L);
            Assertions.assertThat((Long)after.getInt64("INTDS2")).isEqualTo(9627503444333L);
            after = ((Struct)((SourceRecord)records.get(1)).value()).getStruct("after");
            Assertions.assertThat((Long)after.getInt64("INTYM")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTYM2")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTDS")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTDS2")).isEqualTo(0L);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat((Iterable)streamingRecords.topics()).contains((Object[])new String[]{"server1.DEBEZIUM.INTERVAL"});
            records = streamingRecords.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat((List)records).hasSize(2);
            after = ((Struct)((SourceRecord)records.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((Long)after.getInt64("INTYM")).isEqualTo(63115200000000L);
            Assertions.assertThat((Long)after.getInt64("INTYM2")).isEqualTo(17524987200000000L);
            Assertions.assertThat((Long)after.getInt64("INTDS")).isEqualTo(259200000000L);
            Assertions.assertThat((Long)after.getInt64("INTDS2")).isEqualTo(9627503444333L);
            after = ((Struct)((SourceRecord)records.get(1)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)4);
            Assertions.assertThat((Long)after.getInt64("INTYM")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTYM2")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTDS")).isEqualTo(0L);
            Assertions.assertThat((Long)after.getInt64("INTDS2")).isEqualTo(0L);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "debezium.interval");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-1539"})
    public void shouldHandleIntervalTypesAsString() throws Exception {
        TestHelper.dropTable(connection, "debezium.interval");
        try {
            String ddl = "create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))";
            connection.execute(new String[]{"create table debezium.interval ( id numeric(6) constraint interval_id_nn not null,  intYM interval year to month, intYM2 interval year(9) to month, intDS interval day to second,  intDS2 interval day(9) to second(9),  constraint interval_pk primary key(id))"});
            TestHelper.streamTable(connection, "debezium.interval");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (1, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (2, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.INTERVAL")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).with(OracleConnectorConfig.INTERVAL_HANDLING_MODE, OracleConnectorConfig.IntervalHandlingMode.STRING.getValue())).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)snapshotRecords.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat((Iterable)snapshotRecords.topics()).contains((Object[])new String[]{"server1.DEBEZIUM.INTERVAL"});
            List records = snapshotRecords.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat((List)records).hasSize(2);
            Struct after = ((Struct)((SourceRecord)records.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((String)after.getString("INTYM")).isEqualTo((Object)"P2Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTYM2")).isEqualTo((Object)"P555Y4M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS")).isEqualTo((Object)"P0Y0M3DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS2")).isEqualTo((Object)"P0Y0M111DT10H9M563.444333S");
            after = ((Struct)((SourceRecord)records.get(1)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((String)after.getString("INTYM")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTYM2")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS2")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (3, INTERVAL '2' YEAR, INTERVAL '555-4' YEAR(3) TO MONTH, INTERVAL '3' DAY, INTERVAL '111 10:09:08.555444333' DAY(3) TO SECOND(9))"});
            connection.execute(new String[]{"INSERT INTO debezium.interval (id, intYM, intYM2, intDS, intDS2) values (4, INTERVAL '0' YEAR, INTERVAL '0' MONTH, INTERVAL '0' DAY, INTERVAL '0' SECOND)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)streamingRecords.allRecordsInOrder()).hasSize(2);
            Assertions.assertThat((Iterable)streamingRecords.topics()).contains((Object[])new String[]{"server1.DEBEZIUM.INTERVAL"});
            records = streamingRecords.recordsForTopic("server1.DEBEZIUM.INTERVAL");
            Assertions.assertThat((List)records).hasSize(2);
            after = ((Struct)((SourceRecord)records.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((String)after.getString("INTYM")).isEqualTo((Object)"P2Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTYM2")).isEqualTo((Object)"P555Y4M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS")).isEqualTo((Object)"P0Y0M3DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS2")).isEqualTo((Object)"P0Y0M111DT10H9M563.444333S");
            after = ((Struct)((SourceRecord)records.get(1)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)4);
            Assertions.assertThat((String)after.getString("INTYM")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTYM2")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            Assertions.assertThat((String)after.getString("INTDS2")).isEqualTo((Object)"P0Y0M0DT0H0M0S");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "debezium.interval");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2624"})
    public void shouldSnapshotAndStreamChangesFromTableWithNumericDefaultValues() throws Exception {
        if (VerifyRecord.isApucurioAvailable()) {
            this.skipAvroValidation();
        }
        TestHelper.dropTable(connection, "debezium.complex_ddl");
        try {
            String ddl = "create table debezium.complex_ddl ( id numeric(6) constraint customers_id_nn not null,  name varchar2(100), value numeric default 1,  constraint customers_pk primary key(id))";
            connection.execute(new String[]{"create table debezium.complex_ddl ( id numeric(6) constraint customers_id_nn not null,  name varchar2(100), value numeric default 1,  constraint customers_pk primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.complex_ddl to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.complex_ddl ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name) values (1, 'Acme')"});
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COMPLEX_DDL")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)snapshotRecords.recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.complex_ddl (id, name)values (2, 'Acme2')"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streamingRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)streamingRecords.recordsForTopic("server1.DEBEZIUM.COMPLEX_DDL").size()).isEqualTo(1);
        }
        finally {
            TestHelper.dropTable(connection, "debezium.complex_ddl");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipOnDatabaseOption(value="Partitioning", enabled=false)
    @FixFor(value={"DBZ-2683"})
    public void shouldSnapshotAndStreamChangesFromPartitionedTable() throws Exception {
        TestHelper.dropTable(connection, "players");
        try {
            String ddl = "CREATE TABLE players (id NUMERIC(6), name VARCHAR(100), birth_date DATE,primary key(id)) PARTITION BY RANGE (birth_date) (PARTITION p2019 VALUES LESS THAN (TO_DATE('2020-01-01', 'yyyy-mm-dd')), PARTITION p2020 VALUES LESS THAN (TO_DATE('2021-01-01', 'yyyy-mm-dd')))";
            connection.execute(new String[]{"CREATE TABLE players (id NUMERIC(6), name VARCHAR(100), birth_date DATE,primary key(id)) PARTITION BY RANGE (birth_date) (PARTITION p2019 VALUES LESS THAN (TO_DATE('2020-01-01', 'yyyy-mm-dd')), PARTITION p2020 VALUES LESS THAN (TO_DATE('2021-01-01', 'yyyy-mm-dd')))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.players to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.players ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (1, 'Roger Rabbit', TO_DATE('2019-05-01', 'yyyy-mm-dd'))"});
            connection.commit();
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.PLAYERS")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)snapshotRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (2, 'Bugs Bunny', TO_DATE('2019-06-26', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"INSERT INTO debezium.players (id, name, birth_date) VALUES (3, 'Elmer Fud', TO_DATE('2020-11-01', 'yyyy-mm-dd'))"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streamRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((int)streamRecords.recordsForTopic("server1.DEBEZIUM.PLAYERS").size()).isEqualTo(2);
        }
        finally {
            TestHelper.dropTable(connection, "players");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2849"})
    public void shouldAvroSerializeColumnsWithSpecialCharacters() throws Exception {
        TestHelper.dropTable(connection, "columns_test");
        try {
            connection.execute(new String[]{"CREATE TABLE columns_test (id NUMERIC(6), amount$ number not null, primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.columns_test to " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.columns_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (1, 12345.67)"});
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.COLUMNS_TEST")).with(OracleConnectorConfig.FIELD_NAME_ADJUSTMENT_MODE, (EnumeratedValue)CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshots = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)snapshots.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord snapshot = (SourceRecord)snapshots.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidRead((SourceRecord)snapshot, (String)"ID", (int)1);
            Struct after = ((Struct)snapshot.value()).getStruct("after");
            Assertions.assertThat((Integer)after.getInt32("ID")).isEqualTo(1);
            Assertions.assertThat((Object)after.get("AMOUNT_")).isEqualTo((Object)VariableScaleDecimal.fromLogical((Schema)after.schema().field("AMOUNT_").schema(), (BigDecimal)BigDecimal.valueOf(12345.67)));
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.columns_test (id, amount$) values (2, 23456.78)"});
            connection.commit();
            AbstractConnectorTest.SourceRecords streams = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)streams.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").size()).isEqualTo(1);
            SourceRecord stream = (SourceRecord)streams.recordsForTopic("server1.DEBEZIUM.COLUMNS_TEST").get(0);
            VerifyRecord.isValidInsert((SourceRecord)stream, (String)"ID", (int)2);
            after = ((Struct)stream.value()).getStruct("after");
            Assertions.assertThat((Integer)after.getInt32("ID")).isEqualTo(2);
            Assertions.assertThat((Object)after.get("AMOUNT_")).isEqualTo((Object)VariableScaleDecimal.fromLogical((Schema)after.schema().field("AMOUNT_").schema(), (BigDecimal)BigDecimal.valueOf(23456.78)));
        }
        finally {
            TestHelper.dropTable(connection, "columns_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Tests archive log support for LogMiner only")
    @FixFor(value={"DBZ-2825"})
    public void testArchiveLogScnBoundariesAreIncluded() throws Exception {
        TestHelper.dropTable(connection, "alog_test");
        try {
            String ddl = "CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))";
            connection.execute(new String[]{"CREATE TABLE alog_test (id numeric, name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.alog_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.alog_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.commit();
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) VALUES (1, 'Test')"});
            connection.commit();
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.ALOG_TEST")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            SourceRecord record = (SourceRecord)snapshotRecords.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
            Struct after = (Struct)((Struct)record.value()).get("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)BigDecimal.valueOf(1L));
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Test");
            this.stopConnector();
            TestHelper.forceFlushOfRedoLogsToArchiveLogs();
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.alog_test (id, name) values (2, 'Home')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((int)records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").size()).isEqualTo(1);
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.ALOG_TEST").get(0);
            after = (Struct)((Struct)record.value()).get("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)BigDecimal.valueOf(2L));
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Home");
        }
        finally {
            TestHelper.dropTable(connection, "alog_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2784"})
    public void shouldConvertDatesSpecifiedAsStringInSQL() throws Exception {
        try {
            TestHelper.dropTable(connection, "orders");
            String ddl = "CREATE TABLE orders (id NUMERIC(6), order_date date not null,primary key(id))";
            connection.execute(new String[]{"CREATE TABLE orders (id NUMERIC(6), order_date date not null,primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.orders TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.orders ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (9, '22-FEB-2018')"});
            connection.execute(new String[]{"COMMIT"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.orders")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords snapshotRecords = this.consumeRecordsByTopic(1);
            List snapshotOrders = snapshotRecords.recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat((int)snapshotOrders.size()).isEqualTo(1);
            Struct snapshotAfter = ((Struct)((SourceRecord)snapshotOrders.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)snapshotAfter.get("ID")).isEqualTo((Object)9);
            Assertions.assertThat((Object)snapshotAfter.get("ORDER_DATE")).isEqualTo((Object)1519257600000L);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.orders VALUES (10, TO_DATE('2018-02-22', 'yyyy-mm-dd'))"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords streamRecords = this.consumeRecordsByTopic(1);
            List orders = streamRecords.recordsForTopic("server1.DEBEZIUM.ORDERS");
            Assertions.assertThat((List)orders).hasSize(1);
            Struct after = ((Struct)((SourceRecord)orders.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)10);
            Assertions.assertThat((Object)after.get("ORDER_DATE")).isEqualTo((Object)1519257600000L);
        }
        finally {
            TestHelper.dropTable(connection, "orders");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2733"})
    public void shouldConvertNumericAsStringDecimalHandlingMode() throws Exception {
        TestHelper.dropTable(connection, "table_number_pk");
        try {
            String ddl = "CREATE TABLE table_number_pk (id NUMBER, name varchar2(255), age number, primary key (id))";
            connection.execute(new String[]{"CREATE TABLE table_number_pk (id NUMBER, name varchar2(255), age number, primary key (id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.table_number_pk TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.table_number_pk ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (1, 'Bob', 25)"});
            connection.execute(new String[]{"COMMIT"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.table_number_pk")).with(OracleConnectorConfig.DECIMAL_HANDLING_MODE, "string")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0);
            List<SchemaAndValueField> expected = Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, (Object)"1"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, (Object)"Bob"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, (Object)"25"));
            this.assertRecordSchemaAndValues(expected, record, "after");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.table_number_pk (id, name, age) values (2, 'Sue', 30)"});
            connection.execute(new String[]{"COMMIT"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK")).hasSize(1);
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.TABLE_NUMBER_PK").get(0);
            expected = Arrays.asList(new SchemaAndValueField("ID", Schema.STRING_SCHEMA, (Object)"2"), new SchemaAndValueField("NAME", Schema.OPTIONAL_STRING_SCHEMA, (Object)"Sue"), new SchemaAndValueField("AGE", Schema.OPTIONAL_STRING_SCHEMA, (Object)"30"));
            this.assertRecordSchemaAndValues(expected, record, "after");
        }
        finally {
            TestHelper.dropTable(connection, "table_number_pk");
        }
    }

    protected void assertRecordSchemaAndValues(List<SchemaAndValueField> expectedByColumn, SourceRecord record, String envelopeFieldName) {
        Struct content = ((Struct)record.value()).getStruct(envelopeFieldName);
        if (expectedByColumn == null) {
            Assertions.assertThat((Object)content).isNull();
        } else {
            ((ObjectAssert)Assertions.assertThat((Object)content).as("expected there to be content in Envelope under " + envelopeFieldName, new Object[0])).isNotNull();
            expectedByColumn.forEach(expected -> expected.assertFor(content));
        }
    }

    @Test
    @FixFor(value={"DBZ-2920"})
    public void shouldStreamDdlThatExceeds4000() throws Exception {
        TestHelper.dropTable(connection, "large_dml");
        String ddl = "CREATE TABLE large_dml (id NUMERIC(6), value varchar2(4000), value2 varchar2(4000), primary key(id))";
        connection.execute(new String[]{"CREATE TABLE large_dml (id NUMERIC(6), value varchar2(4000), value2 varchar2(4000), primary key(id))"});
        connection.execute(new String[]{"GRANT SELECT ON debezium.large_dml TO " + TestHelper.getConnectorUserName()});
        connection.execute(new String[]{"ALTER TABLE debezium.large_dml ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
        String largeValue = this.generateAlphaNumericStringColumn(4000);
        String largeValue2 = this.generateAlphaNumericStringColumn(4000);
        connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (1, '" + largeValue + "', '" + largeValue2 + "')"});
        connection.commit();
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.large_dml")).with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).build();
        this.start(OracleConnector.class, config);
        this.assertNoRecordsToConsume();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((Iterable)records.topics()).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(1);
        Struct after = ((Struct)((SourceRecord)records.recordsForTopic("server1.DEBEZIUM.LARGE_DML").get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("VALUE")).isEqualTo((Object)largeValue);
        Assertions.assertThat((Object)after.get("VALUE2")).isEqualTo((Object)largeValue2);
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        ArrayList<String> largeValues = new ArrayList<String>();
        ArrayList<String> largeValues2 = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            largeValues.add(this.generateAlphaNumericStringColumn(4000));
            largeValues2.add(this.generateAlphaNumericStringColumn(4000));
            connection.execute(new String[]{"INSERT INTO large_dml (id, value, value2) values (" + (2 + i) + ", '" + (String)largeValues.get(largeValues.size() - 1) + "', '" + (String)largeValues2.get(largeValues2.size() - 1) + "')"});
        }
        connection.commit();
        records = this.consumeRecordsByTopic(10);
        Assertions.assertThat((Iterable)records.topics()).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.LARGE_DML")).hasSize(10);
        List entries = records.recordsForTopic("server1.DEBEZIUM.LARGE_DML");
        for (int i = 0; i < 10; ++i) {
            SourceRecord record = (SourceRecord)entries.get(i);
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)(2 + i));
            Assertions.assertThat((Object)after.get("VALUE")).isEqualTo(largeValues.get(i));
            Assertions.assertThat((Object)after.get("VALUE2")).isEqualTo(largeValues2.get(i));
        }
        this.stopConnector(r -> TestHelper.dropTable(connection, "large_dml"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.XSTREAM, reason="Only applies to Xstreams")
    @FixFor(value={"DBZ-2891"})
    public void shouldNotObserveDeadlockWhileStreamingWithXstream() throws Exception {
        long oldPollTimeInMs = this.pollTimeoutInMs;
        TestHelper.dropTable(connection, "deadlock_test");
        try {
            String ddl = "CREATE TABLE deadlock_test (id numeric(9), name varchar2(50), primary key(id))";
            connection.execute(new String[]{"CREATE TABLE deadlock_test (id numeric(9), name varchar2(50), primary key(id))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.deadlock_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.deadlock_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            this.pollTimeoutInMs = TimeUnit.SECONDS.toMillis(20L);
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "debezium.deadlock_test")).with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.MAX_QUEUE_SIZE, 2)).with(RelationalDatabaseConnectorConfig.MAX_BATCH_SIZE, 1)).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            for (int i = 0; i < 10; ++i) {
                connection.execute(new String[]{"INSERT INTO deadlock_test (id, name) values (" + i + ", 'Test " + i + "')"});
                connection.execute(new String[]{"COMMIT"});
            }
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(10, 24);
            Assertions.assertThat((Iterable)records.topics()).hasSize(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DEADLOCK_TEST")).hasSize(10);
        }
        finally {
            this.pollTimeoutInMs = oldPollTimeInMs;
            TestHelper.dropTable(connection, "deadlock_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3057"})
    public void shouldReadTableUniqueIndicesWithCharactersThatRequireExplicitQuotes() throws Exception {
        String TABLE_NAME = "debezium.\"#T70_Sid:582003931_1_ConnConne\"";
        try {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
            String ddl = "CREATE GLOBAL TEMPORARY TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" (id number, name varchar2(50))";
            connection.execute(new String[]{"CREATE GLOBAL TEMPORARY TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" (id number, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.\"#T70_Sid:582003931_1_ConnConne\" TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.\"#T70_Sid:582003931_1_ConnConne\" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.\\#T70_Sid\\:582003931_1_ConnConne")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        }
        finally {
            TestHelper.dropTable(connection, "debezium.\"#T70_Sid:582003931_1_ConnConne\"");
        }
    }

    @Test
    @FixFor(value={"DBZ-3151"})
    public void testSnapshotCompletesWithSystemGeneratedUniqueIndexOnKeylessTable() throws Exception {
        TestHelper.dropTable(connection, "XML_TABLE");
        try {
            String ddl = "CREATE TABLE XML_TABLE of XMLTYPE";
            connection.execute(new String[]{"CREATE TABLE XML_TABLE of XMLTYPE"});
            connection.execute(new String[]{"GRANT SELECT ON DEBEZIUM.XML_TABLE TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE DEBEZIUM.XML_TABLE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"INSERT INTO DEBEZIUM.XML_TABLE values (xmltype('<?xml version=\"1.0\"?><tab><name>Hi</name></tab>'))"});
            connection.execute(new String[]{"COMMIT"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.XML_TABLE")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        }
        finally {
            TestHelper.dropTable(connection, "XML_TABLE");
        }
    }

    @Test
    @FixFor(value={"DBZ-3001"})
    public void shouldGetOracleDatabaseVersion() throws Exception {
        OracleDatabaseVersion version = connection.getOracleVersion();
        Assertions.assertThat((Object)version).isNotNull();
        Assertions.assertThat((int)version.getMajor()).isGreaterThan(0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3109"})
    public void shouldStreamChangesForTableWithMultipleLogGroupTypes() throws Exception {
        try {
            TestHelper.dropTable(connection, "log_group_test");
            String ddl = "CREATE TABLE log_group_test (id numeric(9,0) primary key, name varchar2(50))";
            connection.execute(new String[]{"CREATE TABLE log_group_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.log_group_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            connection.execute(new String[]{"ALTER TABLE debezium.log_group_test ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.LOG_GROUP_TEST")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.log_group_test (id, name) values (1,'Test')"});
            connection.execute(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.LOG_GROUP_TEST")).hasSize(1);
        }
        finally {
            TestHelper.dropTable(connection, "log_group_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-2875"})
    public void shouldResumeStreamingAtCorrectScnOffset() throws Exception {
        TestHelper.dropTable(connection, "offset_test");
        try {
            Testing.Debug.enable();
            connection.execute(new String[]{"CREATE TABLE offset_test (id numeric(9,0) primary key, name varchar2(50))"});
            connection.execute(new String[]{"GRANT SELECT ON debezium.offset_test TO " + TestHelper.getConnectorUserName()});
            connection.execute(new String[]{"ALTER TABLE debezium.offset_test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.OFFSET_TEST")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (1, 'Bob')"});
            AbstractConnectorTest.SourceRecords records1 = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records1.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            Struct after = (Struct)((Struct)((SourceRecord)records1.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print((Object)after);
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Bob");
            this.stopConnector();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO debezium.offset_test (id, name) values (2, 'Bill')"});
            AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records2.recordsForTopic("server1.DEBEZIUM.OFFSET_TEST")).hasSize(1);
            after = (Struct)((Struct)((SourceRecord)records2.allRecordsInOrder().get(0)).value()).get("after");
            Testing.print((Object)after);
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Bill");
        }
        finally {
            TestHelper.dropTable(connection, "offset_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3036"})
    public void shouldHandleParentChildIndexOrganizedTables() throws Exception {
        TestHelper.dropTable(connection, "test_iot");
        try {
            String ddl = "CREATE TABLE test_iot (id numeric(9,0), description varchar2(50) not null, primary key(id)) ORGANIZATION INDEX INCLUDING description OVERFLOW";
            connection.execute(new String[]{ddl});
            TestHelper.streamTable(connection, "debezium.test_iot");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('1', 'Hello World')"});
            connection.execute(new String[]{"COMMIT"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "(.)*IOT(.)*")).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            Struct after = (Struct)((Struct)record.value()).get("after");
            VerifyRecord.isValidRead((SourceRecord)record, (String)"ID", (int)1);
            Assertions.assertThat((Object)after.get("DESCRIPTION")).isEqualTo((Object)"Hello World");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.test_iot VALUES ('2', 'Goodbye')"});
            connection.execute(new String[]{"COMMIT"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.TEST_IOT")).hasSize(1);
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.TEST_IOT").get(0);
            after = (Struct)((Struct)record.value()).get("after");
            VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
            Assertions.assertThat((Object)after.get("DESCRIPTION")).isEqualTo((Object)"Goodbye");
        }
        finally {
            TestHelper.dropTable(connection, "test_iot");
            TestHelper.purgeRecycleBin(connection);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3257"})
    public void shouldSnapshotAndStreamClobDataTypes() throws Exception {
        TestHelper.dropTable(connection, "clob_test");
        try {
            String ddl = "CREATE TABLE clob_test(id numeric(9,0) primary key, val_clob clob, val_nclob nclob)";
            connection.execute(new String[]{ddl});
            TestHelper.streamTable(connection, "clob_test");
            connection.execute(new String[]{"INSERT INTO clob_test values (1, 'TestClob', 'TestNClob')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CLOB_TEST")).with(OracleConnectorConfig.LOB_ENABLED, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)sourceRecords.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            Struct after = (Struct)((Struct)((SourceRecord)records.get(0)).value()).get("after");
            Assertions.assertThat((Object)after.get("VAL_CLOB")).isEqualTo((Object)"TestClob");
            Assertions.assertThat((Object)after.get("VAL_NCLOB")).isEqualTo((Object)"TestNClob");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"UPDATE clob_test SET val_clob = 'TestClob2', val_nclob = 'TestNClob2' WHERE ID = 1"});
            sourceRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)sourceRecords.recordsForTopic("server1.DEBEZIUM.CLOB_TEST")).hasSize(1);
            records = sourceRecords.recordsForTopic("server1.DEBEZIUM.CLOB_TEST");
            VerifyRecord.isValidUpdate((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            after = (Struct)((Struct)((SourceRecord)records.get(0)).value()).get("after");
            Assertions.assertThat((Object)after.get("VAL_CLOB")).isEqualTo((Object)"TestClob2");
            Assertions.assertThat((Object)after.get("VAL_NCLOB")).isEqualTo((Object)"TestNClob2");
        }
        finally {
            TestHelper.dropTable(connection, "clob_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3347"})
    public void shouldContainPartitionInSchemaChangeEvent() throws Exception {
        TestHelper.dropTable(connection, "dbz3347");
        try {
            connection.execute(new String[]{"create table dbz3347 (id number primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3347");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3347")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords schemaChanges = this.consumeRecordsByTopic(1);
            SourceRecord change = (SourceRecord)schemaChanges.recordsForTopic("server1").get(0);
            Assertions.assertThat((Map)change.sourcePartition()).isEqualTo(Collections.singletonMap("server", "server1"));
        }
        finally {
            TestHelper.dropTable(connection, "dbz3347");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-832"})
    public void shouldSnapshotAndStreamTablesWithNoPrimaryKey() throws Exception {
        TestHelper.dropTable(connection, "dbz832");
        try {
            connection.execute(new String[]{"create table dbz832 (id numeric(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz832");
            connection.execute(new String[]{"INSERT INTO dbz832 values (1, 'Test')"});
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ832")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat((Object)record.key()).isNull();
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz832 values (2, 'Test2')"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ832")).hasSize(1);
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ832").get(0);
            Assertions.assertThat((Object)record.key()).isNull();
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test2");
        }
        finally {
            TestHelper.dropTable(connection, "dbz832");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-1211"})
    public void shouldSnapshotAndStreamTablesWithUniqueIndexPrimaryKey() throws Exception {
        TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
        try {
            connection.execute(new String[]{"create table dbz1211 (id numeric(9,0), data varchar2(50), constraint pkdbz1211 primary key (id) using index)"});
            connection.execute(new String[]{"alter table dbz1211 add constraint xdbz1211 unique (id,data) using index"});
            connection.execute(new String[]{"create table dbz1211_child (id numeric(9,0), data varchar2(50), constraint fk1211 foreign key (id) references dbz1211 on delete cascade)"});
            connection.execute(new String[]{"alter table dbz1211_child add constraint ydbz1211 unique (id,data) using index"});
            TestHelper.streamTable(connection, "dbz1211");
            TestHelper.streamTable(connection, "dbz1211_child");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211 values (1, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (1, 'Child')"});
            connection.commit();
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ1211,DEBEZIUM\\.DBZ1211\\_CHILD")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0);
            Struct key = (Struct)record.key();
            Assertions.assertThat((Object)key).isNotNull();
            Assertions.assertThat((Object)key.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)key.schema().field("DATA")).isNull();
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test");
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0);
            key = (Struct)record.key();
            Assertions.assertThat((Object)key).isNotNull();
            Assertions.assertThat((Object)key.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)key.get("DATA")).isEqualTo((Object)"Child");
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Child");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz1211 values (2, 'Test2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (1, 'Child1-2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz1211_child values (2, 'Child2-1')"});
            connection.commit();
            records = this.consumeRecordsByTopic(3);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ1211")).hasSize(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD")).hasSize(2);
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ1211").get(0);
            key = (Struct)record.key();
            Assertions.assertThat((Object)key).isNotNull();
            Assertions.assertThat((Object)key.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)key.schema().field("DATA")).isNull();
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test2");
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(0);
            key = (Struct)record.key();
            Assertions.assertThat((Object)key).isNotNull();
            Assertions.assertThat((Object)key.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)key.get("DATA")).isEqualTo((Object)"Child1-2");
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Child1-2");
            record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ1211_CHILD").get(1);
            key = (Struct)record.key();
            Assertions.assertThat((Object)key).isNotNull();
            Assertions.assertThat((Object)key.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)key.get("DATA")).isEqualTo((Object)"Child2-1");
            after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Child2-1");
        }
        catch (Throwable throwable) {
            TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
            throw throwable;
        }
        TestHelper.dropTables(connection, "dbz1211_child", "dbz1211");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3322"})
    public void shouldNotEmitEventsOnConstraintViolations() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            try {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test1')"});
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test2')"});
            }
            catch (SQLException e) {
                block7: {
                    try {
                        if (e.getMessage().startsWith("ORA-00001")) break block7;
                        throw e;
                    }
                    catch (Throwable throwable) {
                        connection.executeWithoutCommitting(new String[]{"COMMIT"});
                        throw throwable;
                    }
                }
                connection.executeWithoutCommitting(new String[]{"COMMIT"});
            }
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Struct after = ((Struct)((SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test1");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz3322");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5090"})
    public void shouldNotEmitEventsOnConstraintViolationsAcrossSessions() throws Exception {
        TestHelper.dropTable(connection, "dbz5090");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5090 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz5090 ON dbz5090 (id)"});
            TestHelper.streamTable(connection, "dbz5090");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5090")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            try (OracleConnection connection2 = TestHelper.testConnection();
                 OracleConnection connection3 = TestHelper.testConnection();){
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test1')"});
                CountDownLatch latchA = new CountDownLatch(2);
                CountDownLatch latchB = new CountDownLatch(1);
                ArrayList<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
                futures.add(executorService.submit(() -> {
                    try {
                        connection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (2,'Test2')"});
                        latchA.countDown();
                        try {
                            connection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test2')"});
                        }
                        catch (SQLException e) {
                            latchB.await();
                            connection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test2')"});
                        }
                        return true;
                    }
                    catch (SQLException e) {
                        return false;
                    }
                }));
                futures.add(executorService.submit(() -> {
                    try {
                        connection3.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (3,'Test3')"});
                        latchA.countDown();
                        try {
                            connection3.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test3')"});
                        }
                        catch (SQLException e) {
                            latchB.await();
                            connection3.executeWithoutCommitting(new String[]{"INSERT INTO dbz5090 (id,data) values (1,'Test3b')"});
                        }
                        return true;
                    }
                    catch (SQLException e) {
                        return false;
                    }
                }));
                latchA.await();
                Thread.sleep(5000L);
                connection.commit();
                latchB.countDown();
                Assertions.assertThat((Boolean)((Boolean)((Future)futures.get(0)).get())).isFalse();
                Assertions.assertThat((Boolean)((Boolean)((Future)futures.get(1)).get())).isFalse();
                connection2.commit();
                connection3.commit();
            }
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(3);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5090");
            Assertions.assertThat((List)records).hasSize(3);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            Struct after = ((Struct)((SourceRecord)records.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test1");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5090");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3322"})
    public void shouldNotEmitEventsInRollbackTransaction() throws Exception {
        TestHelper.dropTable(connection, "dbz3322");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3322 (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX uk_dbz3322 ON dbz3322 (id)"});
            TestHelper.streamTable(connection, "dbz3322");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3322")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (1, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (2, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"ROLLBACK"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3322 (id,data) values (3, 'Test')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3322")).hasSize(1);
            Struct value = (Struct)((SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ3322").get(0)).value();
            Assertions.assertThat((Object)value.getStruct("after").get("ID")).isEqualTo((Object)3);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz3322");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3062"})
    public void shouldSelectivelySnapshotTables() throws Exception {
        TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3062a (id number(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE TABLE dbz3062b (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3062a");
            TestHelper.streamTable(connection, "dbz3062b");
            connection.execute(new String[]{"INSERT INTO dbz3062a VALUES (1, 'Test1')"});
            connection.execute(new String[]{"INSERT INTO dbz3062b VALUES (2, 'Test2')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3062.*")).with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*DEBEZIUM\\.DBZ3062A")).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableA = records.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            List tableB = records.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat((List)tableA).hasSize(1);
            Assertions.assertThat((List)tableB).isNull();
            Struct after = ((Struct)((SourceRecord)tableA.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test1");
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062a VALUES (3, 'Test3')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3062b VALUES (4, 'Test4')"});
            connection.commit();
            records = this.consumeRecordsByTopic(2);
            tableA = records.recordsForTopic("server1.DEBEZIUM.DBZ3062A");
            tableB = records.recordsForTopic("server1.DEBEZIUM.DBZ3062B");
            Assertions.assertThat((List)tableA).hasSize(1);
            Assertions.assertThat((List)tableB).hasSize(1);
            after = ((Struct)((SourceRecord)tableA.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test3");
            after = ((Struct)((SourceRecord)tableB.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)4);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"Test4");
        }
        catch (Throwable throwable) {
            TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
            throw throwable;
        }
        TestHelper.dropTables(connection, "dbz3062a", "dbz3062b");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3616"})
    public void shouldNotLogWarningsAboutCommittedTransactionsWhileStreamingNormally() throws Exception {
        TestHelper.dropTables(connection, "dbz3616", "dbz3616");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3616 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3616");
            connection.commit();
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.INITIAL)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3616.*")).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "regex")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            OracleConnection connection2 = TestHelper.testConnection();
            connection2.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (1,'Conn2')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3616 (id,data) values (2,'Conn1')"});
            connection.commit();
            Awaitility.await().pollDelay(Durations.ONE_MINUTE).timeout(Durations.TWO_MINUTES).until(() -> true);
            connection2.commit();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3616")).hasSize(2);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3616");
            Assertions.assertThat((Object)((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after").get("ID")).isEqualTo((Object)2);
            Assertions.assertThat((Object)((Struct)((SourceRecord)tableRecords.get(1)).value()).getStruct("after").get("ID")).isEqualTo((Object)1);
        }
        catch (Throwable throwable) {
            TestHelper.dropTables(connection, "dbz3616", "dbz3616");
            throw throwable;
        }
        TestHelper.dropTables(connection, "dbz3616", "dbz3616");
    }

    @Test
    @FixFor(value={"DBZ-3668"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1001, 'DBZ3668', 100)"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List customers = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)customers).hasSize(1);
        for (SourceRecord customer : customers) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)customer, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)customer, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)customer, (String)"oracle", (String)"server1", (boolean)false);
        }
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1002, 'DBZ3668', 95)"});
        records = this.consumeRecordsByTopic(1);
        customers = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)customers).hasSize(1);
        for (SourceRecord customer : customers) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)customer, (boolean)false, jsonNode -> Assertions.assertThat((String)jsonNode.get("id").asText()).contains(new CharSequence[]{"scn:"}));
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)customer, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)customer, (String)"oracle", (String)"server1", (boolean)false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3896"})
    public void shouldCaptureTableMetadataWithMultipleStatements() throws Exception {
        try {
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3896")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"CREATE TABLE dbz3896 (id number(9,0), name varchar2(50), data varchar2(50))", "CREATE UNIQUE INDEX dbz3896_pk ON dbz3896 (\"ID\", \"NAME\")", "ALTER TABLE dbz3896 ADD CONSTRAINT idx_dbz3896 PRIMARY KEY (\"ID\", \"NAME\") USING INDEX \"DBZ3896_PK\""});
            TestHelper.streamTable(connection, "dbz3896");
            connection.execute(new String[]{"INSERT INTO dbz3896 (id,name,data) values (1,'First','Test')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3896")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ3896").get(0);
            Assertions.assertThat((Object)record.key()).isNotNull();
            Assertions.assertThat((Object)record.keySchema().field("ID")).isNotNull();
            Assertions.assertThat((Object)record.keySchema().field("NAME")).isNotNull();
            Assertions.assertThat((Object)((Struct)record.key()).get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)((Struct)record.key()).get("NAME")).isEqualTo((Object)"First");
        }
        finally {
            TestHelper.dropTable(connection, "dbz3896");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Tests specific LogMiner features")
    @SkipOnDatabaseOption(value="Real Application Clusters", enabled=true, reason="Performance w/CATALOG_IN_REDO on Oracle RAC")
    @FixFor(value={"DBZ-3898"})
    public void shouldIgnoreAllTablesInExcludedSchemas() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3898");
            connection.execute(new String[]{"CREATE TABLE dbz3898 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3898");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_STRATEGY, (EnumeratedValue)OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz3898 (id,data) values (1,'Test')"});
            Scn scnAfterInsert = TestHelper.getCurrentScn();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3898")).hasSize(1);
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> Scn.valueOf((String)((String)this.getStreamingMetric("CurrentScn"))).compareTo(scnAfterInsert) > 0);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz3898");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Tests archive log support for LogMiner only")
    @FixFor(value={"DBZ-3712", "DBZ-4879"})
    public void shouldStartWithArchiveLogOnlyModeAndStreamWhenRecordsBecomeAvailable() throws Exception {
        TestHelper.dropTable(connection, "dbz3712");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3712");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true)).with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.waitForAvailableRecords(OracleConnectorIT.waitTimeForRecords(), TimeUnit.SECONDS);
            this.assertNoRecordsToConsume();
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (1, 'Test')"});
            this.waitForLogSwitchOrForceOneAfterTimeout();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (2, 'Test2')"});
            this.waitForLogSwitchOrForceOneAfterTimeout();
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
        }
        finally {
            TestHelper.dropTable(connection, "dbz3712");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Tests archive log support for LogMiner only")
    @FixFor(value={"DBZ-3712"})
    public void shouldPermitChangingToArchiveLogOnlyModeOnExistingConnector() throws Exception {
        TestHelper.dropTable(connection, "dbz3712");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz3712 (id number(9,0), data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz3712");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (1, 'Test1')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            this.stopConnector();
            config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, true)).with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_SCN_POLL_INTERVAL_MS, 2000)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3712")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.waitForAvailableRecords(OracleConnectorIT.waitTimeForRecords(), TimeUnit.SECONDS);
            this.assertNoRecordsToConsume();
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (2, 'Test2')"});
            this.waitForLogSwitchOrForceOneAfterTimeout();
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO dbz3712 (id,data) values (3, 'Test2')"});
            this.waitForLogSwitchOrForceOneAfterTimeout();
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3712")).hasSize(1);
        }
        finally {
            TestHelper.dropTable(connection, "dbz3712");
        }
    }

    private void waitForLogSwitchOrForceOneAfterTimeout() throws SQLException {
        List<BigInteger> sequences = TestHelper.getCurrentRedoLogSequences();
        try {
            Awaitility.await().pollInterval(Duration.of(5L, ChronoUnit.SECONDS)).atMost(Duration.of(20L, ChronoUnit.SECONDS)).until(() -> {
                if (TestHelper.getCurrentRedoLogSequences().equals(sequences)) {
                    this.assertNoRecordsToConsume();
                    return false;
                }
                return true;
            });
        }
        catch (ConditionTimeoutException e) {
            TestHelper.forceLogfileSwitch();
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Specific to only LogMiner")
    @FixFor(value={"DBZ-3978"})
    public void shouldFilterUser() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3978");
            connection.execute(new String[]{"CREATE TABLE dbz3978 (id number(9,0), data varchar2(50), primary key (id))"});
            TestHelper.streamTable(connection, "dbz3978");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3978")).with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.LOG_MINING_USERNAME_EXCLUDE_LIST, "DEBEZIUM")).with(OracleConnectorConfig.LOG_MINING_QUERY_FILTER_MODE, "none")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (1, 'Test1')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO debezium.dbz3978 VALUES (2, 'Test2')"});
            connection.execute(new String[]{"COMMIT"});
            Assertions.assertThat((boolean)this.waitForAvailableRecords(10L, TimeUnit.SECONDS)).isFalse();
            Long totalDmlCount = (Long)this.getStreamingMetric("TotalCapturedDmlCount");
            Assertions.assertThat((Long)totalDmlCount).isGreaterThanOrEqualTo(2L);
        }
        finally {
            TestHelper.dropTable(connection, "dbz3978");
        }
    }

    @Test
    @FixFor(value={"DBZ-5756"})
    public void testShouldIgnoreCompressionAdvisorTablesDuringSnapshotAndStreaming() throws Exception {
        TestHelper.dropTable(connection, "CMP3$12345");
        try {
            connection.execute(new String[]{"CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))"});
            TestHelper.streamTable(connection, "CMP3$12345");
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')"});
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (2, 2, 'data')"});
            try {
                Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                    this.assertNoRecordsToConsume();
                    return false;
                });
            }
            catch (ConditionTimeoutException conditionTimeoutException) {
                // empty catch block
            }
        }
        finally {
            TestHelper.dropTable(connection, "CMP3$12345");
        }
    }

    @Test
    @FixFor(value={"DBZ-5756"})
    public void testShouldIgnoreCompressionAdvisorTablesDuringStreaming() throws Exception {
        TestHelper.dropTable(connection, "CMP3$12345");
        try {
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CMP.*")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"CREATE TABLE CMP3$12345 (id numeric(9,0), id2 numeric(9,0), data varchar2(50), primary key(id, id2))"});
            TestHelper.streamTable(connection, "CMP3$12345");
            connection.execute(new String[]{"INSERT INTO CMP3$12345 (id,id2,data) values (1, 1, 'data')"});
            try {
                Awaitility.await().atMost(Duration.ofSeconds(10L)).until(() -> {
                    this.assertNoRecordsToConsume();
                    return false;
                });
            }
            catch (ConditionTimeoutException conditionTimeoutException) {
                // empty catch block
            }
        }
        finally {
            TestHelper.dropTable(connection, "CMP3$12345");
        }
    }

    private <T> T getStreamingMetric(String metricName) throws JMException {
        MBeanServer mbeanServer = ManagementFactory.getPlatformMBeanServer();
        ObjectName objectName = OracleConnectorIT.getStreamingMetricsObjectName((String)"oracle", (String)"server1");
        return (T)mbeanServer.getAttribute(objectName, metricName);
    }

    private String generateAlphaNumericStringColumn(int size) {
        String alphaNumericString = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz";
        StringBuilder sb = new StringBuilder(size);
        for (int i = 0; i < size; ++i) {
            int index = (int)((double)"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".length() * Math.random());
            sb.append("ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyz".charAt(index));
        }
        return sb.toString();
    }

    private void verifyHeartbeatRecord(SourceRecord heartbeat) {
        TestCase.assertEquals((String)"__debezium-heartbeat.server1", (String)heartbeat.topic());
        Struct key = (Struct)heartbeat.key();
        Assertions.assertThat((Object)key.get("serverName")).isEqualTo((Object)"server1");
    }

    private long toMicroSecondsSinceEpoch(LocalDateTime localDateTime) {
        return localDateTime.toEpochSecond(ZoneOffset.UTC) * MICROS_PER_SECOND;
    }

    @Test(expected=DebeziumException.class)
    @FixFor(value={"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyRecoveryExceptionWithoutOffset() {
        Path path = Testing.Files.createTestingPath((String)"missing-history.txt").toAbsolutePath();
        Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY)).with(FileSchemaHistory.FILE_PATH, (Object)path)).build();
        AtomicReference exception = new AtomicReference();
        this.start(OracleConnector.class, config, (success, message, error) -> exception.set(error));
        Testing.Files.delete((Path)path);
        throw (RuntimeException)exception.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyRecovery() throws Exception {
        try {
            Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")).with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())).with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
            Configuration config = builder.build();
            this.consumeRecords(config);
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')"});
            builder.with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY_RECOVERY);
            config = builder.build();
            this.start(OracleConnector.class, config);
            int recordCount = 1;
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(recordCount);
            Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(recordCount);
            Struct struct = (Struct)((Struct)((SourceRecord)sourceRecords.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals((Object)3, (Object)struct.get("ID"));
            TestCase.assertEquals((Object)"asuka", (Object)struct.get("DATA"));
        }
        finally {
            TestHelper.dropTable(connection, "DBZ3986");
        }
    }

    @Test(expected=DebeziumException.class)
    @FixFor(value={"DBZ-3986"})
    public void shouldCreateSnapshotSchemaOnlyExceptionWithoutHistory() throws Exception {
        try {
            Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")).with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())).with(EmbeddedEngine.OFFSET_STORAGE, FileOffsetBackingStore.class.getName());
            Configuration config = builder.build();
            this.consumeRecords(config);
            AtomicReference exception = new AtomicReference();
            this.start(OracleConnector.class, config, (success, message, error) -> exception.set(error));
            throw (RuntimeException)exception.get();
        }
        catch (Throwable throwable) {
            TestHelper.dropTable(connection, "DBZ3986");
            throw throwable;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3986"})
    public void shouldSkipDataOnSnapshotSchemaOnly() throws Exception {
        try {
            Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.SCHEMA_ONLY)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3986")).with(OracleConnectorConfig.SCHEMA_HISTORY, MemorySchemaHistory.class.getName())).with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class.getName());
            Configuration config = builder.build();
            this.consumeRecords(config);
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (3, 'asuka')"});
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (4, 'debezium')"});
            int recordCount = 1;
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(recordCount);
            Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(recordCount);
            Struct struct = (Struct)((Struct)((SourceRecord)sourceRecords.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals((Object)4, (Object)struct.get("ID"));
            TestCase.assertEquals((Object)"debezium", (Object)struct.get("DATA"));
        }
        finally {
            TestHelper.dropTable(connection, "DBZ3986");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies to LogMiner only")
    @FixFor(value={"DBZ-4161"})
    public void shouldWarnAboutTableNameLengthExceeded() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
            connection.execute(new String[]{"CREATE TABLE dbz4161_with_a_name_that_is_greater_than_30 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
            connection.execute(new String[]{"INSERT INTO dbz4161_with_a_name_that_is_greater_than_30 values (1, 'snapshot')"});
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30")).build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30").get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"snapshot");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz4161_with_a_name_that_is_greater_than_30 values (2, 'streaming')"});
            this.waitForCurrentScnToHaveBeenSeenByConnector();
            this.assertNoRecordsToConsume();
            Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("Table 'DBZ4161_WITH_A_NAME_THAT_IS_GREATER_THAN_30' won't be captured by Oracle LogMiner")).isTrue();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4161_with_a_name_that_is_greater_than_30");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies to LogMiner only")
    @FixFor(value={"DBZ-4161"})
    public void shouldWarnAboutColumnNameLengthExceeded() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4161");
            connection.execute(new String[]{"CREATE TABLE dbz4161 (id numeric(9,0), a_very_long_column_name_that_is_greater_than_30 varchar2(30))"});
            TestHelper.streamTable(connection, "dbz4161");
            connection.execute(new String[]{"INSERT INTO dbz4161 values (1, 'snapshot')"});
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4161")).build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4161")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4161").get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("A_VERY_LONG_COLUMN_NAME_THAT_IS_GREATER_THAN_30")).isEqualTo((Object)"snapshot");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz4161 values (2, 'streaming')"});
            this.waitForCurrentScnToHaveBeenSeenByConnector();
            this.assertNoRecordsToConsume();
            Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("Table 'DBZ4161' won't be captured by Oracle LogMiner")).isTrue();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4161");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3611"})
    public void shouldSafelySnapshotAndStreamWithDatabaseIncludeList() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3611");
            connection.execute(new String[]{"CREATE TABLE dbz3611 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz3611");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (1, 'snapshot')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_INCLUDE_LIST, TestHelper.getDatabaseName())).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.dbz3611")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ3611").get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"snapshot");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (2, 'streaming')"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz3611");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-3611"})
    public void shouldSafelySnapshotAndStreamWithDatabaseExcludeList() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz3611");
            connection.execute(new String[]{"CREATE TABLE dbz3611 (id numeric(9,0), data varchar2(30))"});
            TestHelper.streamTable(connection, "dbz3611");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (1, 'snapshot')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_EXCLUDE_LIST, TestHelper.getDatabaseName() + "2")).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.dbz3611")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            SourceRecord record = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ3611").get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)"snapshot");
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz3611 values (2, 'streaming')"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ3611")).hasSize(1);
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz3611");
        }
    }

    @Test
    @FixFor(value={"DBZ-4376"})
    public void shouldNotRaiseNullPointerExceptionWithNonUppercaseDatabaseName() throws Exception {
        Configuration config = TestHelper.isUsingPdb() ? ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.PDB_NAME, TestHelper.getDatabaseName().toLowerCase())).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build() : ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.DATABASE_NAME, TestHelper.getDatabaseName().toLowerCase())).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.CUSTOMER")).build();
        connection.execute(new String[]{"INSERT INTO debezium.customer (id,name) values (1, 'Bugs Bunny')"});
        this.start(OracleConnector.class, config);
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        List records = this.consumeRecordsByTopic(1).recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((Object)((Struct)((SourceRecord)records.get(0)).value()).getStruct("after").get("ID")).isEqualTo((Object)1);
    }

    @FixFor(value={"DBZ-3986"})
    private void consumeRecords(Configuration config) throws SQLException, InterruptedException {
        TestHelper.dropTable(connection, "DBZ3986");
        connection.execute(new String[]{"CREATE TABLE DBZ3986 (ID number(9,0), DATA varchar2(50))"});
        TestHelper.streamTable(connection, "DBZ3986");
        this.start(OracleConnector.class, config);
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
        connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (1, 'Test')"});
        connection.execute(new String[]{"INSERT INTO DBZ3986 (ID, DATA) values (2, 'ashlin')"});
        int recordCount = 2;
        AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(recordCount);
        Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(recordCount);
        this.stopConnector();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4367"})
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundary() throws Exception {
        TestHelper.dropTable(connection, "DBZ4367");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367")).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY)).build();
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post snapshot post TX')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(4);
            List ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{1, 2, 3, 4});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(4);
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4367"})
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundaryWithoutDuplicatingSnapshottedChanges() throws Exception {
        OracleConnection secondConnection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "DBZ4367");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            secondConnection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'pre-snapshot in another TX')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367")).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY)).build();
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
            List ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{1, 3});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(2);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (5, 'post snapshot post TX')"});
            records = this.consumeRecordsByTopic(3);
            ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{2, 4, 5});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(3);
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            secondConnection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="User-defined types not supported")
    @FixFor(value={"DBZ-4367"})
    public void shouldCaptureChangesForTransactionsAcrossSnapshotBoundaryWithoutReemittingDDLChanges() throws Exception {
        OracleConnection secondConnection = TestHelper.testConnection();
        TestHelper.dropTable(connection, "DBZ4367");
        TestHelper.dropTable(connection, "DBZ4367_EXTRA");
        try {
            connection.execute(new String[]{"CREATE TABLE DBZ4367 (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367");
            connection.execute(new String[]{"CREATE TABLE DBZ4367_EXTRA (ID number(9, 0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ4367_EXTRA");
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (1, 'pre-snapshot pre TX')"});
            connection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA) VALUES (100, 'second table, pre-snapshot pre TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (2, 'pre-snapshot in TX')"});
            secondConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA ADD DATA2 VARCHAR2(50) DEFAULT 'default2'"});
            secondConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA ADD DATA3 VARCHAR2(50) DEFAULT 'default3'"});
            secondConnection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2, DATA3) VALUES (150, 'second table, with outdated schema', 'something', 'something')"});
            secondConnection.execute(new String[]{"ALTER TABLE DBZ4367_EXTRA DROP COLUMN DATA3"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (200, 'second table, pre-snapshot in TX', 'something')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4367,DEBEZIUM\\.DBZ4367_EXTRA")).with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.TRANSACTION_VIEW_ONLY)).build();
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(9);
            List ddls = records.ddlRecordsForDatabase(TestHelper.getDatabaseName());
            ddls.forEach(r -> Assertions.assertThat((String)((Struct)r.value()).getString("ddl")).contains(new CharSequence[]{"CREATE TABLE"}));
            Assertions.assertThat((List)ddls).hasSize(6);
            List ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsExactly((Object[])new Integer[]{1});
            ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367_EXTRA").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{100, 150});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(2);
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (3, 'post-snapshot in TX')"});
            connection.executeWithoutCommitting(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (300, 'second table, post-snapshot in TX', 'something')"});
            connection.executeWithoutCommitting(new String[]{"COMMIT"});
            connection.execute(new String[]{"INSERT INTO DBZ4367 (ID, DATA) VALUES (4, 'post snapshot post TX')"});
            connection.execute(new String[]{"INSERT INTO DBZ4367_EXTRA (ID, DATA, DATA2) VALUES (400, 'second table, post-snapshot post TX', 'something')"});
            records = this.consumeRecordsByTopic(6);
            ddls = records.ddlRecordsForDatabase(TestHelper.getDatabaseName());
            if (ddls != null) {
                Assertions.assertThat((List)ddls).isEmpty();
            }
            ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{2, 3, 4});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(3);
            ids = records.recordsForTopic("server1.DEBEZIUM.DBZ4367_EXTRA").stream().map(r -> this.getAfter((SourceRecord)r).getInt32("ID")).collect(Collectors.toList());
            Assertions.assertThat(ids).containsOnly((Object[])new Integer[]{200, 300, 400});
            Assertions.assertThat(ids).doesNotHaveDuplicates();
            Assertions.assertThat(ids).hasSize(3);
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, "DBZ4367");
            TestHelper.dropTable(connection, "DBZ4367_EXTRA");
            secondConnection.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to LogMiner")
    @FixFor(value={"DBZ-5085"})
    public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundarySmallTrxs() throws Exception {
        TestHelper.dropTable(connection, "dbz5085");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerAdapter.class);
            this.setConsumeTimeout(10L, TimeUnit.SECONDS);
            connection.execute(new String[]{"CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5085");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085")).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL)).build();
            int expected = 50;
            LOGGER.info("Inserting {} records", (Object)50);
            for (int i = 0; i < 50; ++i) {
                if (i % 2 == 0) {
                    connection.execute(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                } else {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                }
                Thread.sleep(100L);
            }
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> logInterceptor.containsMessage("Pending Transaction '"));
            connection.commit();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(50);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5085");
            Assertions.assertThat((List)records).hasSize(50);
            boolean snapshotFound = false;
            boolean streamingFound = false;
            for (int i = 0; i < 50; ++i) {
                SourceRecord record = (SourceRecord)records.get(i);
                Struct value = (Struct)record.value();
                if (value.getString("op").equals(Envelope.Operation.READ.code())) {
                    snapshotFound = true;
                    VerifyRecord.isValidRead((SourceRecord)record, (String)"ID", (int)i);
                    continue;
                }
                streamingFound = true;
                VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)i);
            }
            Assertions.assertThat((boolean)snapshotFound).isTrue();
            Assertions.assertThat((boolean)streamingFound).isTrue();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5085");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to LogMiner")
    @FixFor(value={"DBZ-5085"})
    public void shouldSnapshotAndStreamAllRecordsThatSpanAcrossSnapshotStreamingBoundaryLargeTrxs() throws Exception {
        TestHelper.dropTable(connection, "dbz5085");
        try {
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerAdapter.class);
            this.setConsumeTimeout(10L, TimeUnit.SECONDS);
            connection.execute(new String[]{"CREATE TABLE dbz5085 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5085");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5085")).with(OracleConnectorConfig.LOG_MINING_TRANSACTION_SNAPSHOT_BOUNDARY_MODE, (EnumeratedValue)OracleConnectorConfig.TransactionSnapshotBoundaryMode.ALL)).build();
            int expected = 50;
            LOGGER.info("Inserting {} records", (Object)50);
            for (int i = 0; i < 50; ++i) {
                if (i % 10 == 0) {
                    connection.execute(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                } else {
                    connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5085 (id,data) values (" + i + ", 'Test-" + i + "')"});
                }
                Thread.sleep(100L);
            }
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            Awaitility.await().atMost(Duration.ofMinutes(3L)).until(() -> logInterceptor.containsMessage("Pending Transaction '"));
            connection.commit();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(50);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5085");
            Assertions.assertThat((List)records).hasSize(50);
            boolean snapshotFound = false;
            boolean streamingFound = false;
            for (int i = 0; i < 50; ++i) {
                SourceRecord record = (SourceRecord)records.get(i);
                Struct value = (Struct)record.value();
                if (value.getString("op").equals(Envelope.Operation.READ.code())) {
                    snapshotFound = true;
                    VerifyRecord.isValidRead((SourceRecord)record, (String)"ID", (int)i);
                    continue;
                }
                streamingFound = true;
                VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)i);
            }
            Assertions.assertThat((boolean)snapshotFound).isTrue();
            Assertions.assertThat((boolean)streamingFound).isTrue();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5085");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to LogMiner")
    @FixFor(value={"DBZ-4842"})
    public void shouldRestartAfterCapturedTableIsDroppedWhileConnectorDown() throws Exception {
        TestHelper.dropTable(connection, "dbz4842");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4842 (id numeric(9,0) primary key, name varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4842");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4842")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz4842 (id,name) values (1,'Test')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4842")).hasSize(1);
            this.stopConnector(running -> Assertions.assertThat((boolean)running).isFalse());
            connection.execute(new String[]{"INSERT INTO dbz4842 (id,name) values (2,'Test')"});
            TestHelper.dropTable(connection, "dbz4842");
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Awaitility.await().pollDelay(10L, TimeUnit.SECONDS).timeout(11L, TimeUnit.SECONDS).until(() -> {
                this.assertNoRecordsToConsume();
                return true;
            });
        }
        finally {
            TestHelper.dropTable(connection, "dbz4842");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="User-defined types not supported")
    @FixFor(value={"DBZ-4852"})
    public void shouldCaptureChangeForTableWithUnsupportedColumnType() throws Exception {
        TestHelper.dropTable(connection, "dbz4852");
        try {
            try (OracleConnection admin = TestHelper.adminConnection(false);){
                admin.execute(new String[]{"CREATE OR REPLACE DIRECTORY DIR_DBZ4852 AS '/home/oracle'"});
            }
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4852")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"CREATE TABLE dbz4852 (id numeric(9,0) primary key, filename bfile)"});
            TestHelper.streamTable(connection, "dbz4852");
            connection.execute(new String[]{"INSERT INTO dbz4852 (id,filename) values (1,bfilename('DIR_DBZ4852','test.txt'))"});
            connection.execute(new String[]{"UPDATE dbz4852 set filename = bfilename('DIR_DBZ4852','test2.txt') WHERE id = 1"});
            connection.execute(new String[]{"DELETE FROM dbz4852 where id = 1"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(3);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4852")).hasSize(3);
            SourceRecord insert = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(0);
            VerifyRecord.isValidInsert((SourceRecord)insert, (String)"ID", (int)1);
            Struct after = ((Struct)insert.value()).getStruct("after");
            Assertions.assertThat((Object)after.schema().field("FILENAME")).isNull();
            SourceRecord update = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(1);
            VerifyRecord.isValidUpdate((SourceRecord)update, (String)"ID", (int)1);
            Struct before = ((Struct)update.value()).getStruct("before");
            Assertions.assertThat((Object)before.schema().field("FILENAME")).isNull();
            after = ((Struct)update.value()).getStruct("after");
            Assertions.assertThat((Object)after.schema().field("FILENAME")).isNull();
            SourceRecord delete = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4852").get(2);
            VerifyRecord.isValidDelete((SourceRecord)delete, (String)"ID", (int)1);
            before = ((Struct)delete.value()).getStruct("before");
            Assertions.assertThat((Object)before.schema().field("FILENAME")).isNull();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4852");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4853"})
    public void shouldCaptureChangeForTableWithUnsupportedColumnTypeLong() throws Exception {
        TestHelper.dropTable(connection, "dbz4853");
        try {
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4853")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"CREATE TABLE dbz4853 (id numeric(9,0) primary key, long_val long)"});
            TestHelper.streamTable(connection, "dbz4853");
            connection.execute(new String[]{"INSERT INTO dbz4853 (id,long_val) values (1,'test.txt')"});
            connection.execute(new String[]{"UPDATE dbz4853 set long_val = 'test2.txt' WHERE id = 1"});
            connection.execute(new String[]{"DELETE FROM dbz4853 where id = 1"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(3);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4853")).hasSize(3);
            SourceRecord insert = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(0);
            VerifyRecord.isValidInsert((SourceRecord)insert, (String)"ID", (int)1);
            Struct after = ((Struct)insert.value()).getStruct("after");
            Assertions.assertThat((Object)after.schema().field("LONG_VAL")).isNull();
            SourceRecord update = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(1);
            VerifyRecord.isValidUpdate((SourceRecord)update, (String)"ID", (int)1);
            Struct before = ((Struct)update.value()).getStruct("before");
            Assertions.assertThat((Object)before.schema().field("LONG_VAL")).isNull();
            after = ((Struct)update.value()).getStruct("after");
            Assertions.assertThat((Object)after.schema().field("LONG_VAL")).isNull();
            SourceRecord delete = (SourceRecord)records.recordsForTopic("server1.DEBEZIUM.DBZ4853").get(2);
            VerifyRecord.isValidDelete((SourceRecord)delete, (String)"ID", (int)1);
            before = ((Struct)delete.value()).getStruct("before");
            Assertions.assertThat((Object)before.schema().field("LONG_VAL")).isNull();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4853");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only LogMiner performs flushes")
    @FixFor(value={"DBZ-4907"})
    public void shouldContinueToUpdateOffsetsEvenWhenTableIsNotChanged() throws Exception {
        TestHelper.dropTable(connection, "dbz4907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4907 (id numeric(9,0) primary key, state varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz4907 (id,state) values (1, 'snapshot')"});
            TestHelper.streamTable(connection, "dbz4907");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4907")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List table = records.recordsForTopic("server1.DEBEZIUM.DBZ4907");
            Assertions.assertThat((List)table).hasSize(1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            String offsetScn = (String)this.getStreamingMetric("OffsetScn");
            String committedScn = (String)this.getStreamingMetric("CommittedScn");
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                String newOffsetScn = (String)this.getStreamingMetric("OffsetScn");
                String newCommittedScn = (String)this.getStreamingMetric("CommittedScn");
                return !newOffsetScn.equals(offsetScn) && !newCommittedScn.equals(committedScn);
            });
        }
        finally {
            TestHelper.dropTable(connection, "dbz4907");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4936"})
    public void shouldNotEmitLastCommittedTransactionEventsUponRestart() throws Exception {
        TestHelper.dropTable(connection, "dbz4936");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4936 (id numeric(9,0) primary key, name varchar2(100))"});
            TestHelper.streamTable(connection, "dbz4936");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4936")).with(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE, "memory")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            try (OracleConnection activeTransaction = TestHelper.testConnection();){
                OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
                activeTransaction.setAutoCommit(false);
                activeTransaction.executeWithoutCommitting(new String[]{"INSERT INTO dbz4936 (id,name) values (1,'In-Progress')"});
                OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
                connection.execute(new String[]{"INSERT INTO dbz4936 (id,name) values (2, 'committed')"});
                AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
                List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat((List)tableRecords).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)2);
                connection.execute(new String[]{"UPDATE dbz4936 set name = 'updated' WHERE id = 2"});
                records = this.consumeRecordsByTopic(1);
                tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat((List)tableRecords).hasSize(1);
                VerifyRecord.isValidUpdate((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)2);
                this.stopConnector();
                this.start(OracleConnector.class, config);
                this.assertConnectorIsRunning();
                OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
                activeTransaction.commit();
                records = this.consumeRecordsByTopic(1);
                tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ4936");
                Assertions.assertThat((List)tableRecords).hasSize(1);
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
                this.assertNoRecordsToConsume();
            }
        }
        finally {
            TestHelper.dropTable(connection, "dbz4936");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies only to LogMiner")
    @FixFor(value={"DbZ-3318"})
    public void shouldSuccessfullyConnectAndStreamWithDatabaseUrl() throws Exception {
        connection.execute(new String[]{"INSERT INTO customer (id,name,score) values (1001, 'DBZ3668', 100)"});
        Map defaultConfig = TestHelper.defaultConfig().build().asMap();
        defaultConfig.remove(OracleConnectorConfig.HOSTNAME.name());
        defaultConfig.put(OracleConnectorConfig.URL.name(), TestHelper.getOracleConnectionUrlDescriptor());
        Configuration.Builder builder = Configuration.create();
        for (Map.Entry entry : defaultConfig.entrySet()) {
            builder.with((String)entry.getKey(), (String)entry.getValue());
        }
        this.start(OracleConnector.class, builder.build());
        this.assertConnectorIsRunning();
        OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        List tableRecords = records.recordsForTopic("server1.DEBEZIUM.CUSTOMER");
        Assertions.assertThat((List)tableRecords).hasSize(1);
        VerifyRecord.isValidRead((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1001);
        OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4953"})
    public void shouldStreamTruncateEventWhenLobIsEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz4953");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4953 (id numeric(9,0) primary key, col2 varchar2(100))"});
            TestHelper.streamTable(connection, "dbz4953");
            connection.execute(new String[]{"INSERT INTO dbz4953 (id,col2) values (1, 'Daffy Duck')"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4953")).with(OracleConnectorConfig.LOB_ENABLED, true)).with(OracleConnectorConfig.SKIPPED_OPERATIONS, "none")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ4953");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
            connection.execute(new String[]{"TRUNCATE TABLE dbz4953"});
            records = this.consumeRecordsByTopic(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ4953");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidTruncate((SourceRecord)((SourceRecord)tableRecords.get(0)));
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4953");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies only to LogMiner")
    @FixFor(value={"DBZ-4963"})
    public void shouldRestartLogMiningSessionAfterMaxSessionElapses() throws Exception {
        TestHelper.dropTable(connection, "dbz4963");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4963 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4963");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4963")).with(OracleConnectorConfig.LOG_MINING_SESSION_MAX_MS, 10000L)).build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> logInterceptor.containsMessage("LogMiner session has exceeded maximum session time"));
            this.stopConnector();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4963");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Ignore(value="Waits 60 seconds by default, so disabled by default")
    @FixFor(value={"DBZ-4963"})
    public void shouldNotRestartLogMiningSessionWithMaxSessionZero() throws Exception {
        TestHelper.dropTable(connection, "dbz4963");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz4963 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz4963");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4963")).build();
            LogInterceptor logInterceptor = new LogInterceptor(LogMinerStreamingChangeEventSource.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            try {
                Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> logInterceptor.containsMessage("LogMiner session has exceeded maximum session time"));
            }
            catch (ConditionTimeoutException e) {
                this.stopConnector();
                TestHelper.dropTable(connection, "dbz4963");
                return;
            }
            Assert.fail((String)"Expected a ConditionTimeoutException, LogMiner session max session message should not have been written.");
        }
        finally {
            TestHelper.dropTable(connection, "dbz4963");
        }
    }

    @Test
    @FixFor(value={"DBZ-5006"})
    public void shouldSupportTablesWithForwardSlashes() throws Exception {
        this.testTableWithForwardSlashes("/dbz5006", "_dbz5006");
        this.testTableWithForwardSlashes("dbz/5006", "dbz_5006");
        this.testTableWithForwardSlashes("dbz5006/", "dbz5006_");
        this.testTableWithForwardSlashes("db/z50/06", "db_z50_06");
        this.testTableWithForwardSlashes("dbz//5006", "dbz__5006");
    }

    @Test
    @FixFor(value={"DBZ-5119"})
    public void shouldExecuteHeartbeatActionQuery() throws Exception {
        TestHelper.dropTable(connection, "dbz5119");
        TestHelper.dropTable(connection, "heartbeat");
        try {
            connection.execute(new String[]{"CREATE TABLE heartbeat (data timestamp)"});
            connection.execute(new String[]{"INSERT INTO heartbeat values (sysdate)"});
            TestHelper.grantRole("INSERT,UPDATE", "debezium.heartbeat", TestHelper.getConnectorUserName());
            connection.execute(new String[]{"CREATE TABLE dbz5119 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5119");
            TestHelper.streamTable(connection, "heartbeat");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, "schema_only")).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5119,DEBEZIUM\\.HEARTBEAT")).with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "UPDATE debezium.heartbeat set data = sysdate WHERE ROWNUM = 1")).with(DatabaseHeartbeatImpl.HEARTBEAT_INTERVAL, 1000)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
                AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
                List heartbeatRecords = records.recordsForTopic("server1.DEBEZIUM.HEARTBEAT");
                return heartbeatRecords != null && !heartbeatRecords.isEmpty();
            });
            this.stopConnector(success -> this.consumeAvailableRecords(r -> {}));
        }
        finally {
            TestHelper.dropTable(connection, "heartbeat");
            TestHelper.dropTable(connection, "dbz5119");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to Oracle LogMiner implementation")
    @FixFor(value={"DBZ-5147"})
    public void shouldStopWhenErrorProcessingFailureHandlingModeIsDefault() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            this.stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor interceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsErrorMessage(ERROR_PROCESSING_FAIL_MESSAGE));
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5147");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to Oracle LogMiner implementation")
    @FixFor(value={"DBZ-5147"})
    public void shouldLogWarningAndSkipWhenErrorProcessingFailureHandlingModeIsWarn() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147")).with(OracleConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, "warn")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            this.stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor interceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsWarnMessage(ERROR_PROCESSING_WARN_MESSAGE));
            sourceRecords = this.consumeRecordsByTopic(1);
            records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)3);
        }
        finally {
            TestHelper.dropTable(connection, "dbz5147");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Only applies to Oracle LogMiner implementation")
    @FixFor(value={"DBZ-5147"})
    public void shouldSilentlySkipWhenErrorProcessingFailureHandlingModeIsSkip() throws Exception {
        TestHelper.dropTable(connection, "dbz5147");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5147 (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO dbz5147 VALUES (1, 'test1')"});
            TestHelper.streamTable(connection, "dbz5147");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5147")).with(OracleConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, "skip")).with(OracleConnectorConfig.LOG_MINING_STRATEGY, "online_catalog")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(1);
            List records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)1);
            this.stopConnector();
            connection.execute(new String[]{"ALTER TABLE dbz5147 add data2 varchar2(50)"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (2, 'test2a', 'test2b')"});
            connection.execute(new String[]{"ALTER TABLE dbz5147 drop column data2"});
            connection.execute(new String[]{"INSERT INTO dbz5147 values (3, 'test3')"});
            LogInterceptor interceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            sourceRecords = this.consumeRecordsByTopic(1);
            records = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ5147");
            Assertions.assertThat((List)records).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"ID", (int)3);
            Assertions.assertThat((boolean)interceptor.containsErrorMessage(ERROR_PROCESSING_FAIL_MESSAGE)).isFalse();
            Assertions.assertThat((boolean)interceptor.containsWarnMessage(ERROR_PROCESSING_WARN_MESSAGE)).isFalse();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5147");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies only to LogMiner")
    @FixFor(value={"DBZ-5139"})
    public void shouldDiscardTransactionThatExceedsEventThreshold() throws Exception {
        TestHelper.dropTable(connection, "dbz5139");
        try {
            int i;
            connection.execute(new String[]{"CREATE TABLE dbz5139 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5139");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5139")).with(OracleConnectorConfig.LOG_MINING_BUFFER_TRANSACTION_EVENTS_THRESHOLD, 100)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            for (i = 0; i < 101; ++i) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i + ", 'Test" + i + "')"});
            }
            connection.commit();
            for (i = 200; i < 225; ++i) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i + ", 'Test" + i + "')"});
            }
            connection.commit();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(25);
            List table = records.recordsForTopic("server1.DEBEZIUM.DBZ5139");
            Assertions.assertThat((List)table).hasSize(25);
            for (int i2 = 0; i2 < 25; ++i2) {
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)table.get(i2)), (String)"ID", (int)(200 + i2));
            }
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5139");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies only to LogMiner")
    @FixFor(value={"DBZ-5139"})
    public void shouldNotDiscardTransactionWhenNoEventThresholdSet() throws Exception {
        TestHelper.dropTable(connection, "dbz5139");
        try {
            int i;
            int i2;
            connection.execute(new String[]{"CREATE TABLE dbz5139 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5139");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5139")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            for (i2 = 0; i2 < 101; ++i2) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i2 + ", 'Test" + i2 + "')"});
            }
            connection.commit();
            for (i2 = 200; i2 < 225; ++i2) {
                connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz5139 (id,data) values (" + i2 + ", 'Test" + i2 + "')"});
            }
            connection.commit();
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(126);
            List table = records.recordsForTopic("server1.DEBEZIUM.DBZ5139");
            Assertions.assertThat((List)table).hasSize(126);
            for (i = 0; i < 101; ++i) {
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)table.get(i)), (String)"ID", (int)i);
            }
            for (i = 0; i < 25; ++i) {
                VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)table.get(101 + i)), (String)"ID", (int)(200 + i));
            }
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5139");
        }
    }

    @Test
    @FixFor(value={"DBZ-5356"})
    public void shouldUniqueIndexWhenAtLeastOneColumnIsExcluded() throws Exception {
        TestHelper.dropTable(connection, "dbz5356");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5356 (id numeric(9,0), data varchar2(50))"});
            connection.execute(new String[]{"CREATE UNIQUE INDEX dbz5356_idx ON dbz5356 (upper(data), id)"});
            TestHelper.streamTable(connection, "dbz5356");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5356")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.stopConnector();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5356");
        }
    }

    private void testTableWithForwardSlashes(String tableName, String topicTableName) throws Exception {
        String quotedTableName = "\"" + tableName + "\"";
        TestHelper.dropTable(connection, quotedTableName);
        try {
            Testing.Files.delete((Path)OFFSET_STORE_PATH);
            Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
            connection.execute(new String[]{"CREATE TABLE " + quotedTableName + " (id numeric(9,0) primary key, data varchar2(50))"});
            connection.execute(new String[]{"INSERT INTO " + quotedTableName + " (id,data) values (1, 'Record1')"});
            TestHelper.streamTable(connection, quotedTableName);
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\." + tableName)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForSnapshotToBeCompleted((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidRead((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            connection.execute(new String[]{"INSERT INTO " + quotedTableName + " (id,data) values (2,'Record2')"});
            records = this.consumeRecordsByTopic(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidInsert((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)2);
            connection.execute(new String[]{"UPDATE " + quotedTableName + " SET data = 'Record2u' WHERE id = 2"});
            records = this.consumeRecordsByTopic(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
            Assertions.assertThat((List)tableRecords).hasSize(1);
            VerifyRecord.isValidUpdate((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)2);
            connection.execute(new String[]{"DELETE " + quotedTableName + " WHERE id = 1"});
            records = this.consumeRecordsByTopic(2);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM." + topicTableName);
            Assertions.assertThat((List)tableRecords).hasSize(2);
            VerifyRecord.isValidDelete((SourceRecord)((SourceRecord)tableRecords.get(0)), (String)"ID", (int)1);
            VerifyRecord.isValidTombstone((SourceRecord)((SourceRecord)tableRecords.get(1)));
            this.assertNoRecordsToConsume();
        }
        catch (Exception e) {
            throw new RuntimeException("Forward-slash test failed for table: " + tableName, e);
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, quotedTableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5441"})
    public void shouldGracefullySkipObjectBasedTables() throws Exception {
        TestHelper.dropTable(connection, "dbz5441");
        try {
            LogInterceptor streamInterceptor;
            TestHelper.grantRole("CREATE ANY TYPE");
            LogInterceptor logInterceptor = new LogInterceptor(RelationalSnapshotChangeEventSource.class);
            connection.execute(new String[]{"CREATE TYPE DEBEZIUM.DBZ5441_TYPE AS OBJECT (id number, lvl number)"});
            connection.execute(new String[]{"CREATE TABLE DEBEZIUM.DBZ5441 of DEBEZIUM.DBZ5441_TYPE (primary key(id))"});
            TestHelper.streamTable(connection, "DBZ5441");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5441")).build();
            switch (TestHelper.getAdapter(config)) {
                case XSTREAM: {
                    streamInterceptor = new LogInterceptor("io.debezium.connector.oracle.xstream.LcrEventHandler");
                    break;
                }
                default: {
                    streamInterceptor = new LogInterceptor(AbstractLogMinerEventProcessor.class);
                }
            }
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            Assertions.assertThat((boolean)logInterceptor.containsMessage("Locking captured tables []")).isTrue();
            connection.execute(new String[]{"INSERT INTO DEBEZIUM.DBZ5441 (id,lvl) values (1,1)"});
            Awaitility.await().atMost(120L, TimeUnit.SECONDS).until(() -> streamInterceptor.containsMessage("is not a relational table and will be skipped"));
            this.assertNoRecordsToConsume();
            this.stopConnector();
        }
        catch (Throwable throwable) {
            TestHelper.dropTable(connection, "dbz5441");
            connection.execute(new String[]{"DROP TYPE DEBEZIUM.DBZ5441_TYPE"});
            TestHelper.revokeRole("CREATE ANY TYPE");
            throw throwable;
        }
        TestHelper.dropTable(connection, "dbz5441");
        connection.execute(new String[]{"DROP TYPE DEBEZIUM.DBZ5441_TYPE"});
        TestHelper.revokeRole("CREATE ANY TYPE");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5682"})
    public void shouldCaptureChangesForTableUniqueIndexWithNullColumnValuesWhenLobEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz5682");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5682 (col_bpchar varchar2(30), col_varchar varchar2(30), col_int4 number(5), constraint uk_dbz5862 unique (col_bpchar, col_varchar))"});
            TestHelper.streamTable(connection, "dbz5682");
            connection.execute(new String[]{"INSERT INTO dbz5682 values ('1', null, 1)"});
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5682")).with(OracleConnectorConfig.LOB_ENABLED, true)).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DBZ5682");
            Assertions.assertThat((List)recordsForTopic).hasSize(1);
            Struct after = ((Struct)((SourceRecord)recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("COL_BPCHAR")).isEqualTo((Object)"1");
            Assertions.assertThat((Object)after.get("COL_VARCHAR")).isNull();
            Assertions.assertThat((Object)after.get("COL_INT4")).isEqualTo((Object)1);
            connection.execute(new String[]{"INSERT INTO dbz5682 values ('2', null, 2)"});
            records = this.consumeRecordsByTopic(1);
            recordsForTopic = records.recordsForTopic("server1.DEBEZIUM.DBZ5682");
            Assertions.assertThat((List)recordsForTopic).hasSize(1);
            after = ((Struct)((SourceRecord)recordsForTopic.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("COL_BPCHAR")).isEqualTo((Object)"2");
            Assertions.assertThat((Object)after.get("COL_VARCHAR")).isNull();
            Assertions.assertThat((Object)after.get("COL_INT4")).isEqualTo((Object)2);
        }
        finally {
            TestHelper.dropTable(connection, "dbz5682");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5626"})
    public void shouldNotUseOffsetScnWhenSnapshotIsAlways() throws Exception {
        try {
            Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)OracleConnectorConfig.SnapshotMode.ALWAYS)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5626");
            Configuration config = builder.build();
            TestHelper.dropTable(connection, "DBZ5626");
            connection.execute(new String[]{"CREATE TABLE DBZ5626 (ID number(9,0), DATA varchar2(50))"});
            TestHelper.streamTable(connection, "DBZ5626");
            connection.execute(new String[]{"INSERT INTO DBZ5626 (ID, DATA) values (1, 'Test1')", "INSERT INTO DBZ5626 (ID, DATA) values (2, 'Test2')"});
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            int expectedRecordCount = 2;
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(expectedRecordCount);
            Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(expectedRecordCount);
            Struct struct = (Struct)((Struct)((SourceRecord)sourceRecords.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals((Object)1, (Object)struct.get("ID"));
            TestCase.assertEquals((Object)"Test1", (Object)struct.get("DATA"));
            this.stopConnector();
            connection.execute(new String[]{"DELETE FROM DBZ5626 WHERE ID=1"});
            connection.execute(new String[]{"INSERT INTO DBZ5626 (ID, DATA) values (3, 'Test3')"});
            this.start(OracleConnector.class, config);
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            sourceRecords = this.consumeRecordsByTopic(expectedRecordCount);
            Assertions.assertThat((List)sourceRecords.allRecordsInOrder()).hasSize(expectedRecordCount);
            struct = (Struct)((Struct)((SourceRecord)sourceRecords.allRecordsInOrder().get(0)).value()).get("after");
            TestCase.assertEquals((Object)2, (Object)struct.get("ID"));
            TestCase.assertEquals((Object)"Test2", (Object)struct.get("DATA"));
        }
        finally {
            TestHelper.dropTable(connection, "DBZ5626");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-5738"})
    public void shouldSkipSnapshotOfNestedTable() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(RelationalSnapshotChangeEventSource.class);
        TestHelper.dropTable(connection, "DBZ5738");
        TestHelper.grantRole("CREATE ANY TYPE");
        try {
            String myTableTypeDll = "CREATE OR REPLACE TYPE my_tab_t AS TABLE OF VARCHAR2(128);";
            connection.execute(new String[]{myTableTypeDll});
            String nestedTableDdl = "create table DBZ5738 ( id numeric(9,0) not null,  c1 int,  c2 my_tab_t,  primary key (id))  nested table c2 store as nested_table";
            connection.execute(new String[]{nestedTableDdl});
            TestHelper.streamTable(connection, "DBZ5738");
            connection.execute(new String[]{"INSERT INTO DBZ5738 VALUES (1, 25, my_tab_t('test1'))"});
            connection.execute(new String[]{"INSERT INTO DBZ5738 VALUES (2, 50, my_tab_t('test2'))"});
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5738")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            this.assertNoRecordsToConsume();
            Assertions.assertThat((boolean)logInterceptor.containsMessage("Locking captured tables []")).isTrue();
            this.stopConnector();
        }
        finally {
            TestHelper.dropTable(connection, "DBZ5738");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="LogMiner only")
    @FixFor(value={"DBZ-5907"})
    public void shouldUndoOnlyLastEventWithSavepoint() throws Exception {
        TestHelper.dropTable(connection, "dbz5907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5907 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5907");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5907")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz5907 (id,data) values (1, 'insert')"});
            AbstractConnectorTest.SourceRecords tableRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            SourceRecord record = (SourceRecord)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
            Assertions.assertThat((Object)this.getAfter(record).get("DATA")).isEqualTo((Object)"insert");
            connection.execute(new String[]{"BEGIN UPDATE dbz5907 SET data = 'update' WHERE id = 1;INSERT INTO dbz5907 (id,data) values (2, 'insert');SAVEPOINT a;UPDATE dbz5907 SET data = 'updateb' WHERE id = 1;ROLLBACK TO SAVEPOINT a;COMMIT;END;"});
            tableRecords = this.consumeRecordsByTopic(2);
            Assertions.assertThat((List)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(2);
            record = (SourceRecord)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidUpdate((SourceRecord)record, (String)"ID", (int)1);
            Assertions.assertThat((Object)this.getAfter(record).get("DATA")).isEqualTo((Object)"update");
            record = (SourceRecord)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(1);
            VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)2);
            Assertions.assertThat((Object)this.getAfter(record).get("DATA")).isEqualTo((Object)"insert");
            this.assertNoRecordsToConsume();
            this.stopConnector();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5907");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="LogMiner only")
    @FixFor(value={"DBZ-5907"})
    public void shouldCorrectlyUndoWithMultipleSavepoints() throws Exception {
        TestHelper.dropTable(connection, "dbz5907");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz5907 (id numeric(9,0) primary key, data varchar2(50))"});
            TestHelper.streamTable(connection, "dbz5907");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ5907")).build();
            LogInterceptor interceptor = config.getString(OracleConnectorConfig.LOG_MINING_BUFFER_TYPE).equals("memory") ? new LogInterceptor(MemoryLogMinerEventProcessor.class.getName()) : new LogInterceptor(AbstractLogMinerEventProcessor.class.getName());
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz5907 (id,data) values (1, 'insert')"});
            AbstractConnectorTest.SourceRecords tableRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            SourceRecord record = (SourceRecord)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidInsert((SourceRecord)record, (String)"ID", (int)1);
            Assertions.assertThat((Object)this.getAfter(record).get("DATA")).isEqualTo((Object)"insert");
            connection.execute(new String[]{"BEGIN SAVEPOINT a;UPDATE dbz5907 SET data = 'update' WHERE id = 1;INSERT INTO dbz5907 (id,data) values (2, 'insert');SAVEPOINT b;UPDATE dbz5907 SET data = 'updateb' WHERE id = 1;ROLLBACK TO SAVEPOINT b;ROLLBACK TO SAVEPOINT a;UPDATE dbz5907 SET data = 'updatea' WHERE id = 1;COMMIT;END;"});
            tableRecords = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907")).hasSize(1);
            record = (SourceRecord)tableRecords.recordsForTopic("server1.DEBEZIUM.DBZ5907").get(0);
            VerifyRecord.isValidUpdate((SourceRecord)record, (String)"ID", (int)1);
            Assertions.assertThat((Object)this.getAfter(record).get("DATA")).isEqualTo((Object)"updatea");
            this.assertNoRecordsToConsume();
            this.stopConnector();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)interceptor.containsMessage("Cannot undo change on table")).as("Unable to correctly undo operation within transaction", new Object[0])).isFalse();
        }
        finally {
            TestHelper.dropTable(connection, "dbz5907");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-6107"})
    public void shouldNotConsolidateBulkUpdateWhenLobEnabledIfUpdatesAreDifferentLogicalRowsWithoutLobColumns() throws Exception {
        TestHelper.dropTable(connection, "dbz6107");
        try {
            SourceRecord record;
            int i;
            connection.execute(new String[]{"CREATE TABLE dbz6107 (a numeric(9,0), b varchar2(25))"});
            TestHelper.streamTable(connection, "dbz6107");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6107")).with(OracleConnectorConfig.LOB_ENABLED, "true")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            for (int i2 = 1; i2 <= 10; ++i2) {
                connection.execute(new String[]{"INSERT INTO dbz6107 (a,b) values (" + i2 + ",'t" + i2 + "')"});
            }
            connection.execute(new String[]{"UPDATE dbz6107 SET a=12 WHERE a=1 OR a=2"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6107");
            Assertions.assertThat((List)tableRecords).hasSize(12);
            for (i = 1; i <= 10; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidInsert((SourceRecord)record);
                Struct after = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after.get("A")).isEqualTo((Object)i);
                Assertions.assertThat((Object)after.get("B")).isEqualTo((Object)("t" + i));
            }
            for (i = 11; i <= 12; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidUpdate((SourceRecord)record);
                Struct before = ((Struct)record.value()).getStruct("before");
                Assertions.assertThat((Object)before.get("A")).isEqualTo((Object)(i - 10));
                Assertions.assertThat((Object)before.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Struct after = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after.get("A")).isEqualTo((Object)12);
                Assertions.assertThat((Object)after.get("B")).isEqualTo((Object)("t" + (i - 10)));
            }
            this.assertNoRecordsToConsume();
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-6107"})
    public void shouldNotConsolidateBulkUpdateWhenLobEnabledIfUpdatesAreDifferentLogicalRowsWithLobColumns() throws Exception {
        TestHelper.dropTable(connection, "dbz6107");
        try {
            Struct after;
            Struct before;
            SourceRecord record;
            int i;
            connection.execute(new String[]{"CREATE TABLE dbz6107 (a numeric(9,0), b varchar2(25), d clob, c clob)"});
            TestHelper.streamTable(connection, "dbz6107");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6107")).with(OracleConnectorConfig.LOB_ENABLED, "true")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            for (int i2 = 1; i2 <= 10; ++i2) {
                connection.execute(new String[]{"INSERT INTO dbz6107 (a,b,c,d) values (" + i2 + ",'t" + i2 + "', 'data" + i2 + "','x')"});
            }
            connection.execute(new String[]{"UPDATE dbz6107 SET a=12 WHERE a=1 OR a=2"});
            connection.execute(new String[]{"UPDATE dbz6107 SET a=13, c = 'Updated' WHERE a=3 OR a=4"});
            connection.execute(new String[]{"UPDATE dbz6107 SET a=14, c = NULL WHERE a=5 OR a=6"});
            int count = 16;
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(16);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6107");
            Assertions.assertThat((List)tableRecords).hasSize(16);
            for (i = 1; i <= 10; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidInsert((SourceRecord)record);
                Struct after2 = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after2.get("A")).isEqualTo((Object)i);
                Assertions.assertThat((Object)after2.get("B")).isEqualTo((Object)("t" + i));
                Assertions.assertThat((Object)after2.get("C")).isEqualTo((Object)("data" + i));
            }
            for (i = 11; i <= 12; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidUpdate((SourceRecord)record);
                before = ((Struct)record.value()).getStruct("before");
                Assertions.assertThat((Object)before.get("A")).isEqualTo((Object)(i - 10));
                Assertions.assertThat((Object)before.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)before.get("C")).isEqualTo((Object)"__debezium_unavailable_value");
                after = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after.get("A")).isEqualTo((Object)12);
                Assertions.assertThat((Object)after.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)after.get("C")).isEqualTo((Object)"__debezium_unavailable_value");
            }
            for (i = 13; i <= 14; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidUpdate((SourceRecord)record);
                before = ((Struct)record.value()).getStruct("before");
                Assertions.assertThat((Object)before.get("A")).isEqualTo((Object)(i - 10));
                Assertions.assertThat((Object)before.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)before.get("C")).isEqualTo((Object)"__debezium_unavailable_value");
                after = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after.get("A")).isEqualTo((Object)13);
                Assertions.assertThat((Object)after.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)after.get("C")).isEqualTo((Object)"Updated");
            }
            for (i = 15; i <= 16; ++i) {
                record = (SourceRecord)tableRecords.get(i - 1);
                VerifyRecord.isValidUpdate((SourceRecord)record);
                before = ((Struct)record.value()).getStruct("before");
                Assertions.assertThat((Object)before.get("A")).isEqualTo((Object)(i - 10));
                Assertions.assertThat((Object)before.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)before.get("C")).isEqualTo((Object)"__debezium_unavailable_value");
                after = ((Struct)record.value()).getStruct("after");
                Assertions.assertThat((Object)after.get("A")).isEqualTo((Object)14);
                Assertions.assertThat((Object)after.get("B")).isEqualTo((Object)("t" + (i - 10)));
                Assertions.assertThat((Object)after.get("C")).isNull();
            }
            this.assertNoRecordsToConsume();
        }
        finally {
            this.stopConnector();
            TestHelper.dropTable(connection, "dbz6107");
        }
    }

    @Test
    @FixFor(value={"DBZ-6120"})
    public void testCapturingChangesForTableWithSapcesInName() throws Exception {
        TestHelper.dropTable(connection, "\"Q1! \u8868\"");
        try {
            connection.execute(new String[]{"CREATE TABLE \"Q1! \u8868\" (a int)"});
            connection.execute(new String[]{"INSERT INTO \"Q1! \u8868\" (a) values (1)"});
            TestHelper.streamTable(connection, "\"Q1! \u8868\"");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.Q1! \u8868")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.Q1___")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO \"Q1! \u8868\" (a) values (2)"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.Q1___")).hasSize(1);
        }
        finally {
            TestHelper.dropTable(connection, "\"Q1! \u8868\"");
        }
    }

    @Test
    @FixFor(value={"DBZ-6120"})
    public void testCapturingChangesForTableWithSpecialCharactersInName() throws Exception {
        TestHelper.dropTable(connection, "\"Q1!\u8868\"");
        try {
            connection.execute(new String[]{"CREATE TABLE \"Q1!\u8868\" (a int)"});
            connection.execute(new String[]{"INSERT INTO \"Q1!\u8868\" (a) values (1)"});
            TestHelper.streamTable(connection, "\"Q1!\u8868\"");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.Q1!\u8868")).build();
            this.start(OracleConnector.class, config);
            this.assertNoRecordsToConsume();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.Q1__")).hasSize(1);
            connection.execute(new String[]{"INSERT INTO \"Q1!\u8868\" (a) values (2)"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.Q1__")).hasSize(1);
        }
        finally {
            TestHelper.dropTable(connection, "\"Q1!\u8868\"");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-6143"})
    public void testTimestampWithTimeZoneFormatConsistentUsingDriverEnabledTimestampTzInGmt() throws Exception {
        TestHelper.dropTable(connection, "tz_test");
        try {
            connection.execute(new String[]{"CREATE TABLE tz_test (a timestamp with time zone)"});
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            TestHelper.streamTable(connection, "tz_test");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST")).with("driver.oracle.jdbc.timestampTzInGmt", "true").build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            Assertions.assertThat((Object)this.getAfter((SourceRecord)tableRecords.get(0)).get("A")).isEqualTo((Object)"2010-12-01T23:12:56.788000-12:44");
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            records = this.consumeRecordsByTopic(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            Assertions.assertThat((Object)this.getAfter((SourceRecord)tableRecords.get(0)).get("A")).isEqualTo((Object)"2010-12-01T23:12:56.788000-12:44");
        }
        finally {
            TestHelper.dropTable(connection, "tz_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-6143"})
    public void testTimestampWithTimeZoneFormatConsistentUsingDriverDisabledTimestampTzInGmt() throws Exception {
        TestHelper.dropTable(connection, "tz_test");
        try {
            connection.execute(new String[]{"CREATE TABLE tz_test (a timestamp with time zone)"});
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            TestHelper.streamTable(connection, "tz_test");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.TZ_TEST")).with("driver.oracle.jdbc.timestampTzInGmt", "false").build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            Assertions.assertThat((Object)this.getAfter((SourceRecord)tableRecords.get(0)).get("A")).isEqualTo((Object)"2010-12-01T23:12:56.788000-12:44");
            connection.execute(new String[]{"INSERT INTO tz_test values (to_timestamp_tz('2010-12-01 23:12:56.788 -12:44', 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM'))"});
            records = this.consumeRecordsByTopic(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.TZ_TEST");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            Assertions.assertThat((Object)this.getAfter((SourceRecord)tableRecords.get(0)).get("A")).isEqualTo((Object)"2010-12-01T23:12:56.788000-12:44");
        }
        finally {
            TestHelper.dropTable(connection, "tz_test");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-6221"})
    public void testShouldProperlyMapCharacterColumnTypesAsCharWhenTableCreatedDuringStreamingPhase() throws Exception {
        TestHelper.dropTable(connection, "dbz6221");
        try {
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ6221")).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"CREATE TABLE dbz6221 (data0 character, data1 character(5), data2 character varying(5))"});
            TestHelper.streamTable(connection, "dbz6221");
            connection.execute(new String[]{"INSERT INTO dbz6221 values ('a', 'abc', 'abc')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6221");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            SourceRecord record = (SourceRecord)tableRecords.get(0);
            Struct after = ((Struct)record.value()).getStruct("after");
            Assertions.assertThat((List)after.schema().fields()).hasSize(3);
            Assertions.assertThat((Comparable)after.schema().field("DATA0").schema().type()).isEqualTo((Object)Schema.Type.STRING);
            Assertions.assertThat((Comparable)after.schema().field("DATA1").schema().type()).isEqualTo((Object)Schema.Type.STRING);
            Assertions.assertThat((Comparable)after.schema().field("DATA2").schema().type()).isEqualTo((Object)Schema.Type.STRING);
            Assertions.assertThat((Object)after.get("DATA0")).isEqualTo((Object)"a");
            Assertions.assertThat((Object)after.get("DATA1")).isEqualTo((Object)"abc  ");
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)"abc");
        }
        finally {
            TestHelper.dropTable(connection, "dbz6221");
        }
    }

    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Applies to LogMiner only")
    @FixFor(value={"DBZ-5395"})
    public void testShouldAdvanceStartScnWhenNoActiveTransactionsBetweenIterationsWhenLobEnabled() throws Exception {
        TestHelper.dropTable(connection, "dbz5395");
        try {
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM.DBZ5395")).with(OracleConnectorConfig.LOB_ENABLED, "true")).build();
            connection.execute(new String[]{"CREATE TABLE dbz5395 (data0 character, data1 character(5), data2 character varying(5))"});
            TestHelper.streamTable(connection, "dbz5395");
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AtomicReference<Scn> offsetScn = new AtomicReference<Scn>(Scn.NULL);
            Awaitility.await().atMost(Duration.ofMinutes(5L)).until(() -> {
                String offsetScnStr = (String)this.getStreamingMetric("OffsetScn");
                if (!Strings.isNullOrBlank((String)offsetScnStr) && !"null".equals(offsetScnStr)) {
                    offsetScn.set(Scn.valueOf((String)offsetScnStr));
                    return true;
                }
                return false;
            });
            Awaitility.await().atMost(Duration.ofMinutes(5L)).pollInterval(Duration.ofSeconds(2L)).until(() -> Scn.valueOf((String)((String)this.getStreamingMetric("OffsetScn"))).compareTo((Scn)offsetScn.get()) > 0);
        }
        finally {
            TestHelper.dropTable(connection, "dbz5395");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="User-defined types not supported")
    @FixFor(value={"DBZ-6355"})
    public void testBacklogTransactionShouldNotBeAbandon() throws Exception {
        TestHelper.dropTable(connection, "dbz6355");
        try {
            connection.execute(new String[]{"CREATE TABLE dbz6355 (id numeric(9,0) primary key, name varchar2(50))"});
            TestHelper.streamTable(connection, "dbz6355");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.LOG_MINING_TRANSACTION_RETENTION_MS, 60000L)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ6355")).build();
            connection.execute(new String[]{"INSERT INTO dbz6355 (id,name) values (1, 'Gerald Jinx Mouse')"});
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(1);
            List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6355");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            Struct after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Gerald Jinx Mouse");
            this.stopConnector();
            try (OracleConnection otherConnection = TestHelper.testConnection();){
                otherConnection.executeWithoutCommitting(new String[]{"INSERT INTO dbz6355 (id,name) values (2, 'Minnie Mouse')"});
                LOGGER.info("Waiting {}ms for second change to age; should not be captured.", (Object)70000L);
                Thread.sleep(70000L);
                this.start(OracleConnector.class, config);
                this.assertConnectorIsRunning();
                OracleConnectorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
                Long fetchingQueryCount = (Long)this.getStreamingMetric("FetchingQueryCount");
                connection.execute(new String[]{"INSERT INTO dbz6355 (id,name) VALUES (3, 'Donald Duck')"});
                Awaitility.waitAtMost((Duration)Duration.ofSeconds(60L)).until(() -> fetchingQueryCount + 5L <= (Long)this.getStreamingMetric("FetchingQueryCount"));
                otherConnection.commit();
            }
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(1);
            tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ6355");
            Assertions.assertThat((List)tableRecords).hasSize(1);
            after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
            Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Donald Duck");
            this.assertNoRecordsToConsume();
        }
        finally {
            TestHelper.dropTable(connection, "dbz6355");
        }
    }

    private void waitForCurrentScnToHaveBeenSeenByConnector() throws SQLException {
        try (OracleConnection admin = TestHelper.adminConnection(true);){
            Scn scn = admin.getCurrentScn();
            Awaitility.await().atMost((long)TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS).until(() -> {
                String scnValue = (String)this.getStreamingMetric("CurrentScn");
                if (scnValue == null || "null".equals(scnValue)) {
                    return false;
                }
                return Scn.valueOf((String)scnValue).compareTo(scn) > 0;
            });
        }
    }

    private Struct getAfter(SourceRecord record) {
        return ((Struct)record.value()).getStruct("after");
    }
}

