/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.cassandra;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.cassandra.AbstractCassandra4CommitLogParser;
import io.debezium.connector.cassandra.CassandraConnectorContext;
import io.debezium.connector.cassandra.CommitLogProcessingResult;
import io.debezium.connector.cassandra.CommitLogProcessorMetrics;
import io.debezium.connector.cassandra.Event;
import io.debezium.connector.cassandra.LogicalCommitLog;
import java.util.List;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Cassandra4CommitLogRealTimeParser
extends AbstractCassandra4CommitLogParser {
    private static final Logger LOGGER = LoggerFactory.getLogger(Cassandra4CommitLogRealTimeParser.class);
    private Integer offset = null;

    public Cassandra4CommitLogRealTimeParser(LogicalCommitLog commitLog, List<ChangeEventQueue<Event>> queues, CommitLogProcessorMetrics metrics, CassandraConnectorContext context) {
        super(commitLog, queues, metrics, context);
    }

    @Override
    public CommitLogProcessingResult parse() {
        try {
            CommitLogPosition commitLogPosition;
            this.parseIndexFile(this.commitLog);
            while (!this.commitLog.completed) {
                LOGGER.debug("Polling for completeness of idx file for: {}", (Object)this.commitLog);
                if (this.completePrematurely) {
                    LOGGER.warn("{} completed prematurely", (Object)this.commitLog);
                    return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.COMPLETED_PREMATURELY);
                }
                commitLogPosition = null;
                if (this.offset == null) {
                    LOGGER.debug("Start to read the partial file : {}", (Object)this.commitLog);
                    commitLogPosition = new CommitLogPosition(this.commitLog.commitLogSegmentId, 0);
                } else if (this.offset < this.commitLog.offsetOfEndOfLastWrittenCDCMutation) {
                    LOGGER.debug("Resume to read the partial file: {}", (Object)this.commitLog);
                    commitLogPosition = new CommitLogPosition(this.commitLog.commitLogSegmentId, this.offset.intValue());
                } else {
                    LOGGER.debug("No movement in offset in idx file: {}", (Object)this.commitLog);
                }
                if (commitLogPosition != null) {
                    this.processCommitLog(this.commitLog, commitLogPosition);
                    this.offset = this.commitLog.offsetOfEndOfLastWrittenCDCMutation;
                    this.metrics.setCommitLogPosition((long)commitLogPosition.position);
                }
                LOGGER.debug("Sleep for idx file to be complete");
                Thread.sleep(this.pollingInterval);
                this.parseIndexFile(this.commitLog);
            }
            LOGGER.info("Completed idx file for: {}", (Object)this.commitLog);
            commitLogPosition = this.offset != null ? new CommitLogPosition(this.commitLog.commitLogSegmentId, this.offset.intValue()) : new CommitLogPosition(this.commitLog.commitLogSegmentId, 0);
            this.metrics.setCommitLogPosition((long)commitLogPosition.position);
            this.processCommitLog(this.commitLog, commitLogPosition);
            return new CommitLogProcessingResult(this.commitLog);
        }
        catch (Exception ex) {
            LOGGER.error("Processing of {} errored out", (Object)this.commitLog, (Object)ex);
            return new CommitLogProcessingResult(this.commitLog, CommitLogProcessingResult.Result.ERROR, ex);
        }
    }
}

