/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.oracle.logminer;

import io.debezium.connector.oracle.OracleDatabaseSchema;
import io.debezium.connector.oracle.OracleOffsetContext;
import io.debezium.connector.oracle.jsqlparser.SimpleDmlParser;
import io.debezium.connector.oracle.logminer.HistoryRecorder;
import io.debezium.connector.oracle.logminer.LogMinerChangeRecordEmitter;
import io.debezium.connector.oracle.logminer.LogMinerHelper;
import io.debezium.connector.oracle.logminer.LogMinerMetrics;
import io.debezium.connector.oracle.logminer.RowMapper;
import io.debezium.connector.oracle.logminer.Scn;
import io.debezium.connector.oracle.logminer.TransactionalBuffer;
import io.debezium.connector.oracle.logminer.TransactionalBufferMetrics;
import io.debezium.connector.oracle.logminer.valueholder.LogMinerDmlEntry;
import io.debezium.data.Envelope;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LogMinerQueryResultProcessor {
    private final ChangeEventSource.ChangeEventSourceContext context;
    private final LogMinerMetrics metrics;
    private final TransactionalBuffer transactionalBuffer;
    private final SimpleDmlParser dmlParser;
    private final OracleOffsetContext offsetContext;
    private final OracleDatabaseSchema schema;
    private final EventDispatcher<TableId> dispatcher;
    private final TransactionalBufferMetrics transactionalBufferMetrics;
    private final String catalogName;
    private final Clock clock;
    private final Logger LOGGER = LoggerFactory.getLogger(LogMinerQueryResultProcessor.class);
    private long currentOffsetScn = 0L;
    private long currentOffsetCommitScn = 0L;
    private long stuckScnCounter = 0L;
    private HistoryRecorder historyRecorder;

    LogMinerQueryResultProcessor(ChangeEventSource.ChangeEventSourceContext context, LogMinerMetrics metrics, TransactionalBuffer transactionalBuffer, SimpleDmlParser dmlParser, OracleOffsetContext offsetContext, OracleDatabaseSchema schema, EventDispatcher<TableId> dispatcher, TransactionalBufferMetrics transactionalBufferMetrics, String catalogName, Clock clock, HistoryRecorder historyRecorder) {
        this.context = context;
        this.metrics = metrics;
        this.transactionalBuffer = transactionalBuffer;
        this.dmlParser = dmlParser;
        this.offsetContext = offsetContext;
        this.schema = schema;
        this.dispatcher = dispatcher;
        this.transactionalBufferMetrics = transactionalBufferMetrics;
        this.catalogName = catalogName;
        this.clock = clock;
        this.historyRecorder = historyRecorder;
    }

    int processResult(ResultSet resultSet) {
        int dmlCounter = 0;
        int insertCounter = 0;
        int updateCounter = 0;
        int deleteCounter = 0;
        int commitCounter = 0;
        int rollbackCounter = 0;
        Instant startTime = Instant.now();
        while (true) {
            try {
                if (!resultSet.next()) {
                    break;
                }
            }
            catch (SQLException e) {
                LogMinerHelper.logError(this.transactionalBufferMetrics, "Closed resultSet", new Object[0]);
                return 0;
            }
            Scn scn = RowMapper.getScn(this.transactionalBufferMetrics, resultSet);
            String tableName = RowMapper.getTableName(this.transactionalBufferMetrics, resultSet);
            String segOwner = RowMapper.getSegOwner(this.transactionalBufferMetrics, resultSet);
            int operationCode = RowMapper.getOperationCode(this.transactionalBufferMetrics, resultSet);
            Timestamp changeTime = RowMapper.getChangeTime(this.transactionalBufferMetrics, resultSet);
            String txId = RowMapper.getTransactionId(this.transactionalBufferMetrics, resultSet);
            String operation = RowMapper.getOperation(this.transactionalBufferMetrics, resultSet);
            String userName = RowMapper.getUsername(this.transactionalBufferMetrics, resultSet);
            boolean isDml = false;
            if (operationCode == 1 || operationCode == 3 || operationCode == 2) {
                isDml = true;
            }
            String redoSql = RowMapper.getSqlRedo(this.transactionalBufferMetrics, resultSet, isDml, this.historyRecorder, scn, tableName, segOwner, operationCode, changeTime, txId);
            this.LOGGER.trace("scn={}, operationCode={}, operation={}, table={}, segOwner={}, userName={}", new Object[]{scn, operationCode, operation, tableName, segOwner, userName});
            String logMessage = String.format("transactionId=%s, SCN=%s, table_name=%s, segOwner=%s, operationCode=%s, offsetSCN=%s,  commitOffsetSCN=%s", txId, scn, tableName, segOwner, operationCode, this.offsetContext.getScn(), this.offsetContext.getCommitScn());
            if (scn == null) {
                LogMinerHelper.logWarn(this.transactionalBufferMetrics, "Scn is null for {}", logMessage);
                return 0;
            }
            if (operationCode == 7) {
                if (this.transactionalBuffer.isTransactionRegistered(txId)) {
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                }
                if (!this.transactionalBuffer.commit(txId, scn, this.offsetContext, changeTime, this.context, logMessage)) continue;
                this.LOGGER.trace("COMMIT, {}", (Object)logMessage);
                ++commitCounter;
                continue;
            }
            if (operationCode == 36) {
                if (this.transactionalBuffer.isTransactionRegistered(txId)) {
                    this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                }
                if (!this.transactionalBuffer.rollback(txId, logMessage)) continue;
                this.LOGGER.trace("ROLLBACK, {}", (Object)logMessage);
                ++rollbackCounter;
                continue;
            }
            if (operationCode == 5) {
                this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                this.LOGGER.info("DDL: {}, REDO_SQL: {}", (Object)logMessage, (Object)redoSql);
                continue;
            }
            if (operationCode == 34) {
                this.historyRecorder.record(scn, tableName, segOwner, operationCode, changeTime, txId, 0, redoSql);
                LogMinerHelper.logWarn(this.transactionalBufferMetrics, "Missing SCN,  {}", logMessage);
                continue;
            }
            if (operationCode != 1 && operationCode != 2 && operationCode != 3) continue;
            this.LOGGER.trace("DML,  {}, sql {}", (Object)logMessage, (Object)redoSql);
            ++dmlCounter;
            switch (operationCode) {
                case 1: {
                    ++insertCounter;
                    break;
                }
                case 3: {
                    ++updateCounter;
                    break;
                }
                case 2: {
                    ++deleteCounter;
                }
            }
            LogMinerDmlEntry dmlEntry = this.dmlParser.parse(redoSql, this.schema.getTables(), txId);
            if (dmlEntry == null || redoSql == null) {
                this.LOGGER.trace("Following statement was not parsed: {}, details: {}", (Object)redoSql, (Object)logMessage);
                continue;
            }
            if (dmlEntry.getCommandType().equals((Object)Envelope.Operation.UPDATE) && dmlEntry.getOldValues().size() == dmlEntry.getNewValues().size() && dmlEntry.getNewValues().containsAll(dmlEntry.getOldValues())) {
                this.LOGGER.trace("Following DML was skipped, most likely because of ignored excluded column change: {}, details: {}", (Object)redoSql, (Object)logMessage);
                continue;
            }
            dmlEntry.setObjectOwner(segOwner);
            dmlEntry.setSourceTime(changeTime);
            dmlEntry.setTransactionId(txId);
            dmlEntry.setObjectName(tableName);
            dmlEntry.setScn(scn);
            try {
                TableId tableId = RowMapper.getTableId(this.catalogName, resultSet);
                this.transactionalBuffer.registerCommitCallback(txId, scn, changeTime.toInstant(), (timestamp, smallestScn, commitScn, counter) -> {
                    if (smallestScn == null || scn.compareTo(smallestScn) < 0) {
                        this.offsetContext.setScn(scn.longValue());
                        this.transactionalBufferMetrics.setOldestScn(scn.longValue());
                    }
                    this.offsetContext.setTransactionId(txId);
                    this.offsetContext.setSourceTime(timestamp.toInstant());
                    this.offsetContext.setTableId(tableId);
                    if (counter == 0) {
                        this.offsetContext.setCommitScn(commitScn.longValue());
                    }
                    Table table = this.schema.tableFor(tableId);
                    this.LOGGER.trace("Processing DML event {} scn {}", (Object)dmlEntry.toString(), (Object)scn);
                    this.dispatcher.dispatchDataChangeEvent((DataCollectionId)tableId, (ChangeRecordEmitter)new LogMinerChangeRecordEmitter(this.offsetContext, dmlEntry, table, this.clock));
                });
            }
            catch (Exception e) {
                LogMinerHelper.logError(this.transactionalBufferMetrics, "Following dmlEntry: {} cannot be dispatched due to the : {}", dmlEntry, e);
            }
        }
        if (dmlCounter > 0 || commitCounter > 0 || rollbackCounter > 0) {
            Duration totalTime = Duration.between(startTime, Instant.now());
            this.metrics.setLastCapturedDmlCount(dmlCounter);
            this.metrics.setLastDurationOfBatchProcessing(totalTime);
            this.warnStuckScn();
            this.currentOffsetScn = this.offsetContext.getScn();
            if (this.offsetContext.getCommitScn() != null) {
                this.currentOffsetCommitScn = this.offsetContext.getCommitScn();
            }
            this.LOGGER.debug("{} DMLs, {} Commits, {} Rollbacks, {} Inserts, {} Updates, {} Deletes. Processed in {} millis. Lag:{}. Offset scn:{}. Offset commit scn:{}. Active transactions:{}. Sleep time:{}", new Object[]{dmlCounter, commitCounter, rollbackCounter, insertCounter, updateCounter, deleteCounter, totalTime.toMillis(), this.transactionalBufferMetrics.getLagFromSource(), this.offsetContext.getScn(), this.offsetContext.getCommitScn(), this.transactionalBufferMetrics.getNumberOfActiveTransactions(), this.metrics.getMillisecondToSleepBetweenMiningQuery()});
        }
        this.historyRecorder.flush();
        return dmlCounter;
    }

    private void warnStuckScn() {
        if (this.offsetContext != null && this.offsetContext.getCommitScn() != null) {
            if (this.currentOffsetScn == this.offsetContext.getScn() && this.currentOffsetCommitScn != this.offsetContext.getCommitScn()) {
                ++this.stuckScnCounter;
                if (this.stuckScnCounter == 25L) {
                    LogMinerHelper.logWarn(this.transactionalBufferMetrics, "Offset SCN {} is not changing. It indicates long transaction(s). Offset commit SCN: {}", this.currentOffsetScn, this.offsetContext.getCommitScn());
                    this.transactionalBufferMetrics.incrementScnFreezeCounter();
                }
            } else {
                this.stuckScnCounter = 0L;
            }
        }
    }
}

