/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.Cassandra4CommitLogProcessor;
import io.debezium.connector.cassandra.Cassandra4CommitLogReadHandlerImpl;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CassandraSchemaFactory;
import io.debezium.connector.cassandra.CommitLogProcessingResult;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.CommitLogTransfer;
import io.debezium.connector.cassandra.EOFEvent;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.Filters;
import io.debezium.connector.cassandra.LogicalCommitLog;
import io.debezium.connector.cassandra.RecordMaker;
import io.debezium.connector.cassandra.exceptions.CassandraConnectorTaskException;
import io.debezium.pipeline.Sizeable;
import java.util.List;
import java.util.Set;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.commitlog.CommitLogReadHandler;
import org.apache.cassandra.db.commitlog.CommitLogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractCassandra4CommitLogParser {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractCassandra4CommitLogParser.class);
    private final CommitLogReader commitLogReader = new CommitLogReader();
    protected final List<ChangeEventQueue<Event>> queues;
    protected final CommitLogProcessorMetrics metrics;
    private final Cassandra4CommitLogReadHandlerImpl commitLogReadHandler;
    private final CommitLogTransfer commitLogTransfer;
    private final Set<String> erroneousCommitLogs;
    protected boolean completePrematurely = false;
    protected LogicalCommitLog commitLog;
    protected int pollingInterval;

    public AbstractCassandra4CommitLogParser(LogicalCommitLog commitLog, List<ChangeEventQueue<Event>> queues, CommitLogProcessorMetrics metrics, CassandraConnectorContext context) {
        this.queues = queues;
        this.metrics = metrics;
        this.commitLog = commitLog;
        this.commitLogReadHandler = new Cassandra4CommitLogReadHandlerImpl(context.getSchemaHolder(), context.getQueues(), context.getOffsetWriter(), new RecordMaker(context.getCassandraConnectorConfig().tombstonesOnDelete(), new Filters(context.getCassandraConnectorConfig().fieldExcludeList()), context.getCassandraConnectorConfig()), metrics, CassandraSchemaFactory.get());
        this.commitLogTransfer = context.getCassandraConnectorConfig().getCommitLogTransfer();
        this.erroneousCommitLogs = context.getErroneousCommitLogs();
        this.pollingInterval = context.getCassandraConnectorConfig().getCommitLogMarkedCompletePollInterval();
    }

    public void complete() {
        this.completePrematurely = true;
    }

    public abstract CommitLogProcessingResult parse();

    public CommitLogProcessingResult process() {
        if (!this.commitLog.exists()) {
            LOGGER.warn("Commit log " + this.commitLog + " does not exist!");
            return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.DOES_NOT_EXIST);
        }
        LOGGER.info("Processing commit log {}", (Object)this.commitLog.log.toString());
        this.metrics.setCommitLogFilename(this.commitLog.log.toString());
        this.metrics.setCommitLogPosition(0L);
        CommitLogProcessingResult result = this.parse();
        if (result.result == CommitLogProcessingResult.Result.OK || result.result == CommitLogProcessingResult.Result.ERROR) {
            this.enqueueEOFEvent();
        }
        Cassandra4CommitLogProcessor.removeProcessing(this);
        LOGGER.debug("Processing {} callables.", (Object)Cassandra4CommitLogProcessor.submittedProcessings.size());
        return result;
    }

    protected void enqueueEOFEvent() {
        try {
            this.queues.get(Math.abs(this.commitLog.log.getName().hashCode() % this.queues.size())).enqueue((Sizeable)new EOFEvent(this.commitLog.log));
        }
        catch (InterruptedException e) {
            throw new CassandraConnectorTaskException(String.format("Enqueuing has been interrupted while enqueuing EOF Event for file %s", this.commitLog.log.getName()), (Throwable)e);
        }
    }

    protected void processCommitLog(LogicalCommitLog logicalCommitLog, CommitLogPosition position) {
        try {
            LOGGER.debug("Starting to read commit log segments {} on position {}", (Object)logicalCommitLog, (Object)position);
            this.commitLogReader.readCommitLogSegment((CommitLogReadHandler)this.commitLogReadHandler, logicalCommitLog.log, position, false);
            LOGGER.debug("Finished reading commit log segments {} on position {}", (Object)logicalCommitLog, (Object)position);
        }
        catch (Exception e) {
            if (this.commitLogTransfer.getClass().getName().equals("io.debezium.connector.cassandra.BlackHoleCommitLogTransfer")) {
                throw new DebeziumException(String.format("Error occurred while processing commit log %s", logicalCommitLog.log), (Throwable)e);
            }
            LOGGER.error("Error occurred while processing commit log " + logicalCommitLog.log, (Throwable)e);
            this.erroneousCommitLogs.add(logicalCommitLog.log.getName());
            this.enqueueEOFEvent();
        }
    }

    protected void parseIndexFile(LogicalCommitLog commitLog) throws DebeziumException {
        try {
            commitLog.parseCommitLogIndex();
        }
        catch (DebeziumException ex) {
            this.erroneousCommitLogs.add(commitLog.log.getName());
            throw ex;
        }
    }
}

