/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnector;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.junit.SkipOnReadOnly;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnDatabaseOptionRule;
import io.debezium.connector.oracle.junit.SkipTestDependingOnReadOnly;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.logwriter.CommitLogWriterFlushStrategy;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="Flush strategy only applies to LogMiner implementation")
@SkipOnReadOnly(reason="Test expects flush table, not applicable during read only.")
public class FlushStrategyIT
extends AbstractConnectorTest {
    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    @Rule
    public final TestRule skipOptionRule = new SkipTestDependingOnDatabaseOptionRule();
    @Rule
    public final TestRule skipReadOnly = new SkipTestDependingOnReadOnly();
    private static OracleConnection connection;
    private static String flushTableName;

    @BeforeClass
    public static void beforeClass() throws SQLException {
        connection = TestHelper.testConnection();
        flushTableName = TestHelper.defaultConfig().build().getString(OracleConnectorConfig.LOG_MINING_FLUSH_TABLE_NAME);
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null) {
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @FixFor(value={"DBZ-4118"})
    public void shouldOnlyMaintainOneRowInFlushStrategyTable() throws Exception {
        try {
            TestHelper.dropTable(connection, "dbz4118");
            connection.execute(new String[]{"CREATE TABLE dbz4118 (id numeric(9,0), data varchar2(50), primary key(id))"});
            TestHelper.streamTable(connection, "dbz4118");
            Configuration config = ((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4118")).build();
            this.dropFlushTable(config);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            FlushStrategyIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (1,'Test')"});
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            this.assertFlushTableHasExactlyOneRow(config);
            this.stopConnector();
            this.insertFlushTable(config, "12345");
            LogInterceptor logInterceptor = new LogInterceptor(CommitLogWriterFlushStrategy.class);
            this.start(OracleConnector.class, config);
            this.assertConnectorIsRunning();
            FlushStrategyIT.waitForStreamingRunning((String)"oracle", (String)"server1");
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> logInterceptor.containsWarnMessage("DBZ-4118: The flush table, " + flushTableName + ", has multiple rows"));
            TestHelper.sleep(5L, TimeUnit.SECONDS);
            this.assertFlushTableHasExactlyOneRow(config);
            connection.execute(new String[]{"INSERT INTO dbz4118 (id,data) values (2,'Test')"});
            records = this.consumeRecordsByTopic(1);
            Assertions.assertThat((List)records.recordsForTopic("server1.DEBEZIUM.DBZ4118")).hasSize(1);
            this.assertFlushTableHasExactlyOneRow(config);
        }
        finally {
            TestHelper.dropTable(connection, "dbz4118");
        }
    }

    private void assertFlushTableHasExactlyOneRow(Configuration config) throws SQLException {
        try (OracleConnection conn = TestHelper.defaultConnection(true);){
            String databasePdbName = config.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty((String)databasePdbName)) {
                conn.setSessionToPdb(databasePdbName);
            }
            Assertions.assertThat((long)conn.getRowCount(FlushStrategyIT.getFlushTableName())).isEqualTo(1L);
        }
    }

    private void dropFlushTable(Configuration config) throws SQLException {
        try (OracleConnection admin = TestHelper.adminConnection(true);){
            String databasePdbName = config.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty((String)databasePdbName)) {
                admin.setSessionToPdb(databasePdbName);
            }
            TestHelper.dropTable(admin, FlushStrategyIT.getFlushTableName());
        }
    }

    private void insertFlushTable(Configuration config, String scnValue) throws SQLException {
        try (OracleConnection conn = TestHelper.defaultConnection(true);){
            String databasePdbName = config.getString(OracleConnectorConfig.PDB_NAME);
            if (!Strings.isNullOrEmpty((String)databasePdbName)) {
                conn.setSessionToPdb(databasePdbName);
            }
            conn.execute(new String[]{"INSERT INTO " + FlushStrategyIT.getFlushTableName() + " values (" + scnValue + ")"});
        }
    }

    private static String getFlushTableName() {
        return TestHelper.getConnectorUserName() + "." + flushTableName;
    }
}

