/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Testing;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="LogMiner specific tests")
public class LogMinerHelperIT
extends AbstractConnectorTest {
    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection conn;

    @BeforeClass
    public static void beforeSuperClass() throws SQLException {
        try (OracleConnection adminConnection = TestHelper.adminConnection(true);){
            LogMinerHelper.removeLogFilesFromMining((OracleConnection)adminConnection);
        }
        conn = TestHelper.defaultConnection(true);
        TestHelper.forceFlushOfRedoLogsToArchiveLogs();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (conn != null && conn.isConnected()) {
            conn.close();
        }
    }

    @Before
    public void before() throws SQLException {
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    @FixFor(value={"DBZ-3256"})
    public void shouldAddCorrectLogFiles() throws Exception {
        int instances = LogMinerHelperIT.getNumberOfInstances(conn);
        Scn currentScn = conn.getCurrentScn();
        List redoFiles = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)currentScn, (Duration)Duration.ofHours(0L), (boolean)false, null);
        Assertions.assertThat((List)redoFiles).hasSize(instances);
        List<Scn> oneDayArchivedNextScn = this.getOneDayArchivedLogNextScn(conn);
        Scn oldestArchivedScn = this.getOldestArchivedScn(oneDayArchivedNextScn);
        List files = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)oldestArchivedScn, (Duration)Duration.ofHours(0L), (boolean)false, null);
        this.assertLogFilesHaveNoGaps(instances, files, oneDayArchivedNextScn);
        files = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)oldestArchivedScn.subtract(Scn.valueOf((long)1L)), (Duration)Duration.ofHours(0L), (boolean)false, null);
        this.assertLogFilesHaveNoGaps(instances, files, oneDayArchivedNextScn);
    }

    private void assertLogFilesHaveNoGaps(int instances, List<LogFile> logFiles, List<Scn> scnList) {
        Set threads = logFiles.stream().map(LogFile::getThread).collect(Collectors.toSet());
        Assertions.assertThat(threads).hasSize(instances);
        int totalThreadLogs = 0;
        for (Integer thread : threads) {
            List threadLogs = logFiles.stream().filter(l -> l.getThread() == thread.intValue()).collect(Collectors.toList());
            BigInteger min = threadLogs.stream().map(LogFile::getSequence).min(BigInteger::compareTo).orElse(BigInteger.ZERO);
            BigInteger max = threadLogs.stream().map(LogFile::getSequence).max(BigInteger::compareTo).orElse(BigInteger.ZERO);
            Assertions.assertThat(threadLogs).hasSize(max.subtract(min).intValue() + 1);
            totalThreadLogs += threadLogs.size();
            int i = min.intValue();
            while (i <= max.intValue()) {
                int sequence = i++;
                long hits = threadLogs.stream().filter(l -> l.getSequence().intValue() == sequence).count();
                Assertions.assertThat((long)hits).isEqualTo(1L);
            }
        }
        Assertions.assertThat((int)totalThreadLogs).isEqualTo(logFiles.size());
        for (Scn scn : scnList) {
            Assertions.assertThat((boolean)logFiles.stream().anyMatch(l -> l.isScnInLogFileRange(scn))).isTrue();
        }
    }

    @Test
    @FixFor(value={"DBZ-3256"})
    public void shouldSetCorrectLogFiles() throws Exception {
        List<Scn> oneDayArchivedNextScn = this.getOneDayArchivedLogNextScn(conn);
        Scn oldestArchivedScn = this.getOldestArchivedScn(oneDayArchivedNextScn);
        LogMinerHelper.setLogFilesForMining((OracleConnection)conn, (Scn)oldestArchivedScn, (Duration)Duration.ofHours(0L), (boolean)false, null, (int)5, (Duration)Duration.ofSeconds(1L), (Duration)Duration.ofSeconds(60L));
        List files = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)oldestArchivedScn, (Duration)Duration.ofHours(0L), (boolean)false, null);
        Assertions.assertThat((int)files.size()).isEqualTo(LogMinerHelperIT.getNumberOfAddedLogFiles(conn));
    }

    @Test
    @FixFor(value={"DBZ-3561"})
    public void shouldOnlyReturnArchiveLogs() throws Exception {
        List files = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)Scn.valueOf((int)0), (Duration)Duration.ofHours(0L), (boolean)true, null);
        files.forEach(file -> Assertions.assertThat((Comparable)file.getType()).isEqualTo((Object)LogFile.Type.ARCHIVE));
    }

    @Test
    @FixFor(value={"DBZ-3661"})
    public void shouldGetArchiveLogsWithDestinationSpecified() throws Exception {
        try (OracleConnection admin = TestHelper.adminConnection(true);){
            admin.execute(new String[]{"ALTER SYSTEM SWITCH ALL LOGFILE"});
            Thread.sleep(5000L);
        }
        List files = LogMinerHelper.getLogFilesForOffsetScn((OracleConnection)conn, (Scn)Scn.valueOf((int)0), (Duration)Duration.ofHours(1L), (boolean)true, (String)"LOG_ARCHIVE_DEST_1");
        Assertions.assertThat((int)files.size()).isGreaterThan(0);
        files.forEach(file -> Assertions.assertThat((Comparable)file.getType()).isEqualTo((Object)LogFile.Type.ARCHIVE));
    }

    private Scn getOldestArchivedScn(List<Scn> oneDayArchivedNextScn) {
        return oneDayArchivedNextScn.stream().min(Scn::compareTo).orElse(Scn.NULL);
    }

    private static int getNumberOfAddedLogFiles(OracleConnection conn) throws SQLException {
        int counter = 0;
        try (PreparedStatement ps = conn.connection(false).prepareStatement("select * from V$LOGMNR_LOGS");
             ResultSet result = ps.executeQuery();){
            while (result.next()) {
                ++counter;
            }
        }
        return counter;
    }

    private List<Scn> getOneDayArchivedLogNextScn(OracleConnection conn) throws SQLException {
        ArrayList<Scn> allArchivedNextScn = new ArrayList<Scn>();
        try (PreparedStatement st = conn.connection(false).prepareStatement("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM V$ARCHIVED_LOG  WHERE NAME IS NOT NULL AND FIRST_TIME >= SYSDATE - 1 AND ARCHIVED = 'YES'  AND STATUS = 'A' ORDER BY 2");
             ResultSet rs = st.executeQuery();){
            while (rs.next()) {
                allArchivedNextScn.add(Scn.valueOf((String)rs.getString(2)));
            }
        }
        return allArchivedNextScn;
    }

    private static int getNumberOfInstances(OracleConnection connection) throws SQLException {
        return (Integer)connection.queryAndMap("SELECT COUNT(1) FROM GV$INSTANCE", rs -> {
            if (rs.next()) {
                return rs.getInt(1);
            }
            return 0;
        });
    }
}

