/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnStrategyRule;
import io.debezium.connector.oracle.junit.SkipWhenLogMiningStrategyIs;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import io.debezium.util.Testing;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.Blob;
import java.sql.Clob;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public class OracleReselectColumnsProcessorIT
extends AbstractReselectProcessorTest<OracleConnector> {
    @Rule
    public final TestRule skipStrategyRule = new SkipTestDependingOnStrategyRule();
    private OracleConnection connection;

    @Before
    public void beforeEach() throws Exception {
        this.connection = TestHelper.testConnection();
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
        super.beforeEach();
    }

    @After
    public void afterEach() throws Exception {
        super.afterEach();
        if (this.connection != null) {
            this.connection.close();
        }
    }

    protected Class<OracleConnector> getConnectorClass() {
        return OracleConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return this.connection;
    }

    protected Configuration.Builder getConfigurationBuilder() {
        return ((Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4321")).with(OracleConnectorConfig.CUSTOM_POST_PROCESSORS, "reselector")).with("reselector.type", ReselectColumnsPostProcessor.class.getName());
    }

    protected String topicName() {
        return "server1.DEBEZIUM.DBZ4321";
    }

    protected String tableName() {
        return "DEBEZIUM.DBZ4321";
    }

    protected String reselectColumnsList() {
        return "DEBEZIUM.DBZ4321:DATA";
    }

    protected void createTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data varchar2(50), data2 numeric(9,0))"});
        TestHelper.streamTable(this.connection, "dbz4321");
    }

    protected void dropTable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
    }

    protected String getInsertWithValue() {
        return "INSERT INTO dbz4321 (id,data,data2) values (1,'one',1)";
    }

    protected String getInsertWithNullValue() {
        return "INSERT INTO dbz4321 (id,data,data2) values (1,null,1)";
    }

    protected void waitForStreamingStarted() throws InterruptedException {
        OracleReselectColumnsProcessorIT.waitForStreamingRunning((String)"oracle", (String)"server1");
    }

    protected String fieldName(String fieldName) {
        return fieldName.toUpperCase();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenLogMiningStrategyIs(value=SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason="Cannot use lob.enabled with Hybrid")
    @FixFor(value={"DBZ-7729"})
    public void testColumnReselectionUsesPrimaryKeyColumnAndValuesDespiteMessageKeyColumnConfigs() throws Exception {
        TestHelper.dropTable(this.connection, "dbz7729");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz7729 (id numeric(9,0) primary key, data clob, data2 numeric(9,0), data3 varchar2(25))"});
            TestHelper.streamTable(this.connection, "dbz7729");
            Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)this.getConfigurationBuilder().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ7729")).with(OracleConnectorConfig.MSG_KEY_COLUMNS, "(.*).DEBEZIUM.DBZ7729:DATA3")).with(OracleConnectorConfig.LOB_ENABLED, "true")).with("reselector.reselect.columns.include.list", "DEBEZIUM.DBZ7729:DATA").build();
            this.start(this.getConnectorClass(), config);
            this.assertConnectorIsRunning();
            this.waitForStreamingStarted();
            String clobData = RandomStringUtils.randomAlphabetic((int)10000);
            Clob clob = this.connection.connection().createClob();
            clob.setString(1L, clobData);
            this.connection.prepareQuery("INSERT INTO dbz7729 (id,data,data2,data3) values (1,?,1,'A')", ps -> ps.setClob(1, clob), null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz7729 set data2=10 where id = 1"});
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(2);
            List tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ7729");
            Assertions.assertThat((List)tableRecords).hasSize(2);
            SourceRecord update = (SourceRecord)tableRecords.get(1);
            VerifyRecord.isValidUpdate((SourceRecord)update, (boolean)true);
            Struct key = (Struct)update.key();
            Assertions.assertThat((List)key.schema().fields()).hasSize(1);
            Assertions.assertThat((Object)key.get("DATA3")).isEqualTo((Object)"A");
            Struct after = ((Struct)update.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)clobData);
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)10);
            Assertions.assertThat((Object)after.get("DATA3")).isEqualTo((Object)"A");
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz7729");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenLogMiningStrategyIs(value=SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason="Cannot use lob.enabled with Hybrid")
    @FixFor(value={"DBZ-4321"})
    public void testClobReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data clob, data2 numeric(9,0))"});
            TestHelper.streamTable(this.connection, "dbz4321");
            Configuration config = ((Configuration.Builder)this.getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true")).with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            this.waitForStreamingStarted();
            String clobData = RandomStringUtils.randomAlphabetic((int)10000);
            Clob clob = this.connection.connection().createClob();
            clob.setString(1L, clobData);
            this.connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", ps -> ps.setClob(1, clob), null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz4321 set data2=10 where id = 1"});
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(2);
            List tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat((List)tableRecords).hasSize(2);
            SourceRecord update = (SourceRecord)tableRecords.get(1);
            VerifyRecord.isValidUpdate((SourceRecord)update, (String)"ID", (int)1);
            Struct after = ((Struct)update.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)clobData);
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)10);
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4321");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @SkipWhenLogMiningStrategyIs(value=SkipWhenLogMiningStrategyIs.Strategy.HYBRID, reason="Cannot use lob.enabled with Hybrid")
    @FixFor(value={"DBZ-4321"})
    public void testBlobReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.dropTable(this.connection, "dbz4321");
        try {
            this.connection.execute(new String[]{"CREATE TABLE dbz4321 (id numeric(9,0) primary key, data blob, data2 numeric(9,0))"});
            TestHelper.streamTable(this.connection, "dbz4321");
            Configuration config = ((Configuration.Builder)this.getConfigurationBuilder().with(OracleConnectorConfig.LOB_ENABLED, "true")).with("reselector.reselect.columns.include.list", this.reselectColumnsList()).build();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            this.waitForStreamingStarted();
            byte[] blobData = RandomStringUtils.random((int)10000).getBytes(StandardCharsets.UTF_8);
            Blob blob = this.connection.connection().createBlob();
            blob.setBytes(1L, blobData);
            this.connection.prepareQuery("INSERT INTO dbz4321 (id,data,data2) values (1,?,1)", ps -> ps.setBlob(1, blob), null);
            this.connection.commit();
            this.connection.execute(new String[]{"UPDATE dbz4321 set data2=10 where id = 1"});
            AbstractConnectorTest.SourceRecords sourceRecords = this.consumeRecordsByTopic(2);
            List tableRecords = sourceRecords.recordsForTopic("server1.DEBEZIUM.DBZ4321");
            Assertions.assertThat((List)tableRecords).hasSize(2);
            SourceRecord update = (SourceRecord)tableRecords.get(1);
            VerifyRecord.isValidUpdate((SourceRecord)update, (String)"ID", (int)1);
            Struct after = ((Struct)update.value()).getStruct("after");
            Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
            Assertions.assertThat((Object)after.get("DATA")).isEqualTo((Object)ByteBuffer.wrap(blobData));
            Assertions.assertThat((Object)after.get("DATA2")).isEqualTo((Object)10);
        }
        finally {
            TestHelper.dropTable(this.connection, "dbz4321");
        }
    }
}

