/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.fest.assertions.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Flush strategy only applies to LogMiner implementation")
public class FlushStrategyIT
extends AbstractConnectorTest {
    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.DB_HISTORY_PATH);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4118"})
    public void shouldOnlyMaintainOneRowInFlushStrategyTable() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4118");
            connection.execute(new String[]{"CREATE TABLE dbz4118 (id numeric(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz4118");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4118")).build();
            this.dropFlushTable();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            FlushStrategyIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (1,'Test')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            this.assertFlushTableHasExactlyOneRow();
            this.stopConnector();
            this.insertFlushTable("12345");
            LogInterceptor logInterceptor = new LogInterceptor();
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            FlushStrategyIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("DBZ-4118: The flush table, LOG_MINING_FLUSH, has multiple rows")).isTrue();
            this.assertFlushTableHasExactlyOneRow();
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (2,'Test')"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            this.assertFlushTableHasExactlyOneRow();
        }
        finally {
            TestHelper.dropTable(connection, "dbz4118");
        }
    }

    private void assertFlushTableHasExactlyOneRow() throws SQLException {
        try (OracleConnection conn = TestHelper.defaultConnection();){
            conn.resetSessionToCdb();
            Assertions.assertThat((long)conn.getRowCount(FlushStrategyIT.getFlushTableName())).isEqualTo(1L);
        }
    }

    private void dropFlushTable() throws SQLException {
        try (OracleConnection admin = TestHelper.adminConnection();){
            admin.resetSessionToCdb();
            TestHelper.dropTable(admin, FlushStrategyIT.getFlushTableName());
        }
    }

    private void insertFlushTable(String scnValue) throws SQLException {
        try (OracleConnection conn = TestHelper.defaultConnection();){
            conn.resetSessionToCdb();
            conn.execute(new String[]{"INSERT INTO " + FlushStrategyIT.getFlushTableName() + " values (" + scnValue + ")"});
        }
    }

    private static String getFlushTableName() {
        return TestHelper.getConnectorUserName() + "." + "LOG_MINING_FLUSH";
    }
}

