/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.AbstractDirectoryWatcher;
import io.debezium.connector.cassandra.AbstractProcessor;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.CommitLogSegmentReader;
import io.debezium.connector.cassandra.CommitLogTransfer;
import io.debezium.connector.cassandra.CommitLogUtil;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.pipeline.Sizeable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLogProcessor
extends AbstractProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(CommitLogProcessor.class);
    private static final String NAME = "Commit Log Processor";
    private final CassandraConnectorContext context;
    private final CommitLogSegmentReader commitLogReader;
    private final File cdcDir;
    private AbstractDirectoryWatcher watcher;
    private final List<ChangeEventQueue<Event>> queues;
    private final boolean latestOnly;
    private final CommitLogProcessorMetrics metrics;
    private boolean initial = true;
    private final boolean errorCommitLogReprocessEnabled;
    private final CommitLogTransfer commitLogTransfer;
    private final Set<String> erroneousCommitLogs;
    private final File commitLogDir;

    public CommitLogProcessor(CassandraConnectorContext context, CommitLogProcessorMetrics metrics, CommitLogSegmentReader commitLogReader, File cdcDir, File commitLogDir) {
        super(NAME, Duration.ZERO);
        this.commitLogReader = commitLogReader;
        this.queues = context.getQueues();
        this.context = context;
        this.metrics = metrics;
        this.cdcDir = cdcDir;
        this.latestOnly = this.context.getCassandraConnectorConfig().latestCommitLogOnly();
        this.errorCommitLogReprocessEnabled = this.context.getCassandraConnectorConfig().errorCommitLogReprocessEnabled();
        this.commitLogTransfer = this.context.getCassandraConnectorConfig().getCommitLogTransfer();
        this.erroneousCommitLogs = this.context.getErroneousCommitLogs();
        this.commitLogDir = commitLogDir;
    }

    @Override
    public void initialize() {
        this.metrics.registerMetrics();
    }

    @Override
    public void destroy() {
        this.metrics.unregisterMetrics();
    }

    @Override
    public void process() throws IOException, InterruptedException {
        LOGGER.debug("Processing commitLogFiles while initial is {}", (Object)this.initial);
        if (this.watcher == null) {
            this.watcher = new AbstractDirectoryWatcher(this.cdcDir.toPath(), this.context.getCassandraConnectorConfig().cdcDirPollInterval(), Collections.singleton(StandardWatchEventKinds.ENTRY_CREATE)){

                @Override
                void handleEvent(WatchEvent<?> event, Path path) {
                    if (CommitLogProcessor.this.isRunning()) {
                        CommitLogProcessor.this.processCommitLog(path.toFile());
                    }
                }
            };
        }
        if (this.latestOnly) {
            this.processLastModifiedCommitLog();
            throw new InterruptedException();
        }
        if (this.initial) {
            LOGGER.info("Reading existing commit logs in {}", (Object)this.cdcDir);
            File[] commitLogFiles = CommitLogUtil.getCommitLogs(this.cdcDir);
            Arrays.sort(commitLogFiles, CommitLogUtil::compareCommitLogs);
            for (File commitLogFile : commitLogFiles) {
                if (!this.isRunning()) continue;
                this.processCommitLog(commitLogFile);
            }
            if (this.errorCommitLogReprocessEnabled) {
                LOGGER.info("CommitLog Error Processing is enabled. Attempting to get all error commitLog files for re-processing.");
                this.commitLogTransfer.getErrorCommitLogFiles();
            }
            this.initial = false;
        }
        this.watcher.poll();
    }

    void processCommitLog(File file) {
        if (file == null) {
            LOGGER.warn("Commit log is null");
            return;
        }
        if (!file.exists()) {
            LOGGER.warn("Commit log " + file.getName() + " does not exist");
            return;
        }
        try {
            try {
                LOGGER.info("Processing commit log {}", (Object)file.getName());
                this.metrics.setCommitLogFilename(file.getName());
                this.commitLogReader.readCommitLogSegment(file, -1L, 0);
                if (!this.latestOnly) {
                    this.queues.get(Math.abs(file.getName().hashCode() % this.queues.size())).enqueue((Sizeable)new EOFEvent(file));
                }
                LOGGER.info("Successfully processed commit log {}", (Object)file.getName());
            }
            catch (Exception e) {
                if (this.commitLogTransfer.getClass().getName().equals("io.debezium.connector.cassandra.BlackHoleCommitLogTransfer")) {
                    throw new DebeziumException(String.format("Error occurred while processing commit log %s", file.getName()), (Throwable)e);
                }
                LOGGER.error("Error occurred while processing commit log " + file.getName(), (Throwable)e);
                if (!this.latestOnly) {
                    this.queues.get(Math.abs(file.getName().hashCode() % this.queues.size())).enqueue((Sizeable)new EOFEvent(file));
                    this.erroneousCommitLogs.add(file.getName());
                }
            }
        }
        catch (InterruptedException e) {
            throw new CassandraConnectorTaskException(String.format("Enqueuing has been interrupted while enqueuing EOF Event for file %s", file.getName()), e);
        }
    }

    void processLastModifiedCommitLog() {
        LOGGER.warn("CommitLogProcessor will read the last modified commit log from the COMMIT LOG DIRECTORY based on modified timestamp, NOT FROM THE CDC_RAW DIRECTORY. This method should not be used in PRODUCTION!");
        File[] files = CommitLogUtil.getCommitLogs(this.commitLogDir);
        File lastModified = null;
        for (File file : files) {
            if (lastModified != null && lastModified.lastModified() >= file.lastModified()) continue;
            lastModified = file;
        }
        if (lastModified != null) {
            this.processCommitLog(lastModified);
        } else {
            LOGGER.debug("No commit logs found in {}", (Object)this.commitLogDir);
        }
    }
}

