/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.AbstractCassandra4CommitLogParser;
import io.debezium.connector.cassandra.AbstractDirectoryWatcher;
import io.debezium.connector.cassandra.AbstractProcessor;
import io.debezium.connector.cassandra.Cassandra4CommitLogBatchParser;
import io.debezium.connector.cassandra.Cassandra4CommitLogRealTimeParser;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessingResult;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.CommitLogTransfer;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.LogicalCommitLog;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.commons.math3.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Cassandra4CommitLogProcessor
extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(Cassandra4CommitLogProcessor.class);
    private static final String NAME = "Commit Log Processor";
    private final CassandraConnectorContext context;
    private final File cdcDir;
    private AbstractDirectoryWatcher watcher;
    private final List<ChangeEventQueue<Event>> queues;
    private final CommitLogProcessorMetrics metrics = new CommitLogProcessorMetrics();
    private boolean initial = true;
    private final boolean errorCommitLogReprocessEnabled;
    private final CommitLogTransfer commitLogTransfer;
    private final ExecutorService executorService;
    static final Set<Pair<AbstractCassandra4CommitLogParser, Future<CommitLogProcessingResult>>> submittedProcessings = ConcurrentHashMap.newKeySet();

    public Cassandra4CommitLogProcessor(CassandraConnectorContext context) {
        super(NAME, Duration.ZERO);
        this.context = context;
        this.queues = this.context.getQueues();
        this.commitLogTransfer = this.context.getCassandraConnectorConfig().getCommitLogTransfer();
        this.errorCommitLogReprocessEnabled = this.context.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
        this.cdcDir = new File(DatabaseDescriptor.getCDCLogLocation());
        this.executorService = Executors.newSingleThreadExecutor();
    }

    public void initialize() {
        this.metrics.registerMetrics();
    }

    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    public void stop() {
        try {
            this.executorService.shutdown();
            for (Pair<AbstractCassandra4CommitLogParser, Future<CommitLogProcessingResult>> submittedProcessing : submittedProcessings) {
                try {
                    ((AbstractCassandra4CommitLogParser)submittedProcessing.getFirst()).complete();
                    ((Future)submittedProcessing.getSecond()).get();
                }
                catch (Exception ex) {
                    LOGGER.warn("Waiting for submitted task to finish has failed.");
                }
            }
        }
        catch (Exception ex) {
            throw new RuntimeException("Unable to close executor service in CommitLogProcessor in a timely manner");
        }
        super.stop();
    }

    protected static synchronized void removeProcessing(AbstractCassandra4CommitLogParser parser) {
        submittedProcessings.stream().filter(p -> p.getFirst() == parser).findFirst().map(submittedProcessings::remove);
    }

    void submit(Path index) {
        AbstractCassandra4CommitLogParser parser = this.context.getCassandraConnectorConfig().isCommitLogRealTimeProcessingEnabled() ? new Cassandra4CommitLogRealTimeParser(new LogicalCommitLog(index.toFile()), this.queues, this.metrics, this.context) : new Cassandra4CommitLogBatchParser(new LogicalCommitLog(index.toFile()), this.queues, this.metrics, this.context);
        Future<CommitLogProcessingResult> future = this.executorService.submit(parser::process);
        submittedProcessings.add((Pair<AbstractCassandra4CommitLogParser, Future<CommitLogProcessingResult>>)new Pair((Object)parser, future));
        LOGGER.debug("Processing {} callables.", (Object)submittedProcessings.size());
    }

    public boolean isRunning() {
        return super.isRunning() && !this.executorService.isShutdown() && !this.executorService.isTerminated();
    }

    public void process() throws IOException, InterruptedException {
        if (this.watcher == null) {
            this.watcher = new AbstractDirectoryWatcher(this.cdcDir.toPath(), this.context.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(StandardWatchEventKinds.ENTRY_CREATE)){

                void handleEvent(WatchEvent<?> event, Path path) {
                    if (Cassandra4CommitLogProcessor.this.isRunning() && path.getFileName().toString().endsWith("_cdc.idx")) {
                        Cassandra4CommitLogProcessor.this.submit(path);
                    }
                }
            };
        }
        if (this.initial) {
            LOGGER.info("Reading existing commit logs in {}", (Object)this.cdcDir);
            File[] indexes = CommitLogUtil.getIndexes((File)this.cdcDir);
            Arrays.sort(indexes, CommitLogUtil::compareCommitLogsIndexes);
            for (File index : indexes) {
                if (!this.isRunning()) continue;
                this.submit(index.toPath());
            }
            if (this.errorCommitLogReprocessEnabled) {
                LOGGER.info("CommitLog Error Processing is enabled. Attempting to get all error commitLog files for re-processing.");
                this.commitLogTransfer.getErrorCommitLogFiles();
            }
            this.initial = false;
        }
        this.watcher.poll();
    }
}

