/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer.processor;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

public abstract class AbstractProcessorTest
extends AbstractConnectorTest {
    private OracleConnection connection;
    @Rule
    public TestRule skipRule = new SkipTestDependingOnAdapterNameRule();

    @Before
    public void before() throws Exception {
        this.connection = TestHelper.testConnection();
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
        TestHelper.dropTable(this.connection, "dbz3752");
        this.connection.execute(new String[]{"CREATE TABLE dbz3752(id number(9,0) primary key, name varchar2(50))"});
        TestHelper.streamTable(this.connection, "dbz3752");
    }

    @After
    public void after() throws Exception {
        this.stopConnector();
        if (this.connection != null) {
            TestHelper.dropTable(this.connection, "dbz3752");
            this.connection.close();
        }
    }

    protected abstract Configuration.Builder getBufferImplementationConfig();

    protected boolean hasPersistedState() {
        return false;
    }

    @Test
    @FixFor(value={"DBZ-3752"})
    public void shouldResumeFromPersistedState() throws Exception {
        if (!this.hasPersistedState()) {
            return;
        }
        Configuration config = ((Configuration.Builder)((Configuration.Builder)this.getBufferImplementationConfig().with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, false)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3752")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractProcessorTest.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (1, 'Mickey Mouse')"});
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(1);
        List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3752");
        Assertions.assertThat((List)tableRecords).hasSize(1);
        Struct after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Mickey Mouse");
        this.stopConnector();
        this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (2, 'Donald Duck')"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractProcessorTest.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (3, 'Roger Rabbit')"});
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(2);
        tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3752");
        Assertions.assertThat((List)tableRecords).hasSize(2);
        after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Donald Duck");
        after = ((Struct)((SourceRecord)tableRecords.get(1)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Roger Rabbit");
    }

    @Test
    @FixFor(value={"DBZ-3752"})
    public void shouldResumeLongRunningTransactionFromPersistedState() throws Exception {
        if (!this.hasPersistedState()) {
            return;
        }
        Configuration config = ((Configuration.Builder)((Configuration.Builder)this.getBufferImplementationConfig().with(OracleConnectorConfig.LOG_MINING_BUFFER_DROP_ON_STOP, false)).with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ3752")).build();
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractProcessorTest.waitForStreamingRunning((String)"oracle", (String)"server1");
        try (OracleConnection secondary = TestHelper.testConnection();){
            this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3752 (id,name) values (1, 'Mickey Mouse')"});
            secondary.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (2, 'Donald Duck')"});
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(1);
        List tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3752");
        Assertions.assertThat((List)tableRecords).hasSize(1);
        Struct after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)2);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Donald Duck");
        this.assertNoRecordsToConsume();
        this.stopConnector();
        this.connection.executeWithoutCommitting(new String[]{"INSERT INTO dbz3752 (id,name) values (3, 'Minnie Mouse')"});
        this.start(OracleConnector.class, config);
        this.assertConnectorIsRunning();
        AbstractProcessorTest.waitForStreamingRunning((String)"oracle", (String)"server1");
        this.connection.execute(new String[]{"INSERT INTO dbz3752 (id,name) values (4, 'Roger Rabbit')"});
        records = this.consumeRecordsByTopic(3);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(3);
        tableRecords = records.recordsForTopic("server1.DEBEZIUM.DBZ3752");
        Assertions.assertThat((List)tableRecords).hasSize(3);
        after = ((Struct)((SourceRecord)tableRecords.get(0)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)1);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Mickey Mouse");
        after = ((Struct)((SourceRecord)tableRecords.get(1)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)3);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Minnie Mouse");
        after = ((Struct)((SourceRecord)tableRecords.get(2)).value()).getStruct("after");
        Assertions.assertThat((Object)after.get("ID")).isEqualTo((Object)4);
        Assertions.assertThat((Object)after.get("NAME")).isEqualTo((Object)"Roger Rabbit");
    }
}

