/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.Scn;
import io.debezium.connector.oracle.junit.SkipTestDependingOnAdapterNameRule;
import io.debezium.connector.oracle.junit.SkipWhenAdapterNameIsNot;
import io.debezium.connector.oracle.logminer.LogFile;
import io.debezium.connector.oracle.logminer.LogFileCollector;
import io.debezium.connector.oracle.util.TestHelper;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.math.BigInteger;
import java.nio.file.Path;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

@SkipWhenAdapterNameIsNot(value=SkipWhenAdapterNameIsNot.AdapterName.LOGMINER, reason="LogMiner specific")
public class LogFileCollectorIT
extends AbstractConnectorTest {
    @Rule
    public final TestRule skipAdapterRule = new SkipTestDependingOnAdapterNameRule();
    private static OracleConnection connection;

    @BeforeClass
    public static void beforeSuperClass() throws SQLException {
        try (OracleConnection adminConnection = TestHelper.adminConnection(true);){
            adminConnection.removeAllLogFilesFromLogMinerSession();
        }
        connection = TestHelper.defaultConnection(true);
        TestHelper.forceFlushOfRedoLogsToArchiveLogs();
    }

    @AfterClass
    public static void closeConnection() throws SQLException {
        if (connection != null && connection.isConnected()) {
            connection.close();
        }
    }

    @Before
    public void before() throws SQLException {
        this.setConsumeTimeout(TestHelper.defaultMessageConsumerPollTimeout(), TimeUnit.SECONDS);
        this.initializeConnectorTestFramework();
        Testing.Files.delete((Path)TestHelper.SCHEMA_HISTORY_PATH);
    }

    @Test
    @FixFor(value={"DBZ-3256"})
    public void shouldAddCorrectLogFiles() throws Exception {
        int instances = LogFileCollectorIT.getNumberOfInstances(connection);
        Scn currentScn = connection.getCurrentScn();
        List redoFiles = this.getLogFileCollector(Duration.ofHours(0L), false, null).getLogs(currentScn);
        Assertions.assertThat((List)redoFiles).hasSize(instances);
        List<Scn> oneDayArchivedNextScn = this.getOneDayArchivedLogNextScn(connection);
        Scn oldestArchivedScn = this.getOldestArchivedScn(oneDayArchivedNextScn);
        List files = this.getLogFileCollector(Duration.ofHours(0L), false, null).getLogs(oldestArchivedScn);
        this.assertLogFilesHaveNoGaps(instances, files, oneDayArchivedNextScn);
        files = this.getLogFileCollector(Duration.ofHours(0L), false, null).getLogs(oldestArchivedScn.subtract(Scn.ONE));
        this.assertLogFilesHaveNoGaps(instances, files, oneDayArchivedNextScn);
    }

    @Test
    @FixFor(value={"DBZ-3561"})
    public void shouldOnlyReturnArchiveLogs() throws Exception {
        List files = this.getLogFileCollector(Duration.ofHours(0L), true, null).getLogs(Scn.valueOf((int)0));
        files.forEach(file -> Assertions.assertThat((Comparable)file.getType()).isEqualTo((Object)LogFile.Type.ARCHIVE));
    }

    @Test
    @FixFor(value={"DBZ-3661"})
    public void shouldGetArchiveLogsWithDestinationSpecified() throws Exception {
        try (OracleConnection admin = TestHelper.adminConnection(true);){
            admin.execute(new String[]{"ALTER SYSTEM SWITCH ALL LOGFILE"});
            Thread.sleep(5000L);
        }
        List files = this.getLogFileCollector(Duration.ofHours(1L), true, "LOG_ARCHIVE_DEST_1").getLogs(Scn.valueOf((int)0));
        Assertions.assertThat((boolean)files.isEmpty()).isFalse();
        files.forEach(file -> Assertions.assertThat((Comparable)file.getType()).isEqualTo((Object)LogFile.Type.ARCHIVE));
    }

    private LogFileCollector getLogFileCollector(Duration logRetention, boolean archiveLogsOnly, String destinationName) {
        Configuration.Builder builder = (Configuration.Builder)((Configuration.Builder)TestHelper.defaultConfig().with(OracleConnectorConfig.ARCHIVE_LOG_HOURS, logRetention.toHours())).with(OracleConnectorConfig.LOG_MINING_ARCHIVE_LOG_ONLY_MODE, Boolean.toString(archiveLogsOnly));
        if (!Strings.isNullOrBlank((String)destinationName)) {
            builder = (Configuration.Builder)builder.with(OracleConnectorConfig.ARCHIVE_DESTINATION_NAME, destinationName);
        }
        OracleConnectorConfig connectorConfig = new OracleConnectorConfig(builder.build());
        return new LogFileCollector(connectorConfig, connection);
    }

    private void assertLogFilesHaveNoGaps(int instances, List<LogFile> logFiles, List<Scn> scnList) {
        Set threads = logFiles.stream().map(LogFile::getThread).collect(Collectors.toSet());
        Assertions.assertThat(threads).hasSize(instances);
        int totalThreadLogs = 0;
        for (Integer thread : threads) {
            List threadLogs = logFiles.stream().filter(l -> l.getThread() == thread.intValue()).collect(Collectors.toList());
            BigInteger min = threadLogs.stream().map(LogFile::getSequence).min(BigInteger::compareTo).orElse(BigInteger.ZERO);
            BigInteger max = threadLogs.stream().map(LogFile::getSequence).max(BigInteger::compareTo).orElse(BigInteger.ZERO);
            Assertions.assertThat(threadLogs).hasSize(max.subtract(min).intValue() + 1);
            totalThreadLogs += threadLogs.size();
            int i = min.intValue();
            while (i <= max.intValue()) {
                int sequence = i++;
                long hits = threadLogs.stream().filter(l -> l.getSequence().intValue() == sequence).count();
                Assertions.assertThat((long)hits).isEqualTo(1L);
            }
        }
        Assertions.assertThat((int)totalThreadLogs).isEqualTo(logFiles.size());
        for (Scn scn : scnList) {
            Assertions.assertThat((boolean)logFiles.stream().anyMatch(l -> l.isScnInLogFileRange(scn))).isTrue();
        }
    }

    private Scn getOldestArchivedScn(List<Scn> oneDayArchivedNextScn) {
        return oneDayArchivedNextScn.stream().min(Scn::compareTo).orElse(Scn.NULL);
    }

    private List<Scn> getOneDayArchivedLogNextScn(OracleConnection conn) throws SQLException {
        ArrayList<Scn> allArchivedNextScn = new ArrayList<Scn>();
        try (PreparedStatement st = conn.connection(false).prepareStatement("SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE FROM V$ARCHIVED_LOG  WHERE NAME IS NOT NULL AND FIRST_TIME >= SYSDATE - 1 AND ARCHIVED = 'YES'  AND STATUS = 'A' ORDER BY 2");
             ResultSet rs = st.executeQuery();){
            while (rs.next()) {
                allArchivedNextScn.add(Scn.valueOf((String)rs.getString(2)));
            }
        }
        return allArchivedNextScn;
    }

    private static int getNumberOfInstances(OracleConnection connection) throws SQLException {
        return (Integer)connection.queryAndMap("SELECT COUNT(1) FROM GV$INSTANCE", rs -> {
            if (rs.next()) {
                return rs.getInt(1);
            }
            return 0;
        });
    }
}

