/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.informix;

import com.informix.jdbc.IfmxReadableType;
import com.informix.jdbcx.IfxDataSource;
import com.informix.stream.api.IfmxStreamOperationRecord;
import com.informix.stream.api.IfmxStreamRecord;
import com.informix.stream.api.IfmxStreamRecordType;
import com.informix.stream.cdc.IfxCDCEngine;
import com.informix.stream.cdc.records.IfxCDCBeginTransactionRecord;
import com.informix.stream.cdc.records.IfxCDCCommitTransactionRecord;
import com.informix.stream.cdc.records.IfxCDCMetaDataRecord;
import com.informix.stream.cdc.records.IfxCDCTruncateRecord;
import com.informix.stream.impl.IfxStreamException;
import io.debezium.connector.informix.InformixCdcTransactionEngine;
import io.debezium.connector.informix.InformixChangeRecordEmitter;
import io.debezium.connector.informix.InformixConnection;
import io.debezium.connector.informix.InformixConnectorConfig;
import io.debezium.connector.informix.InformixDatabaseSchema;
import io.debezium.connector.informix.InformixOffsetContext;
import io.debezium.connector.informix.InformixPartition;
import io.debezium.connector.informix.InformixStreamTransactionRecord;
import io.debezium.connector.informix.Lsn;
import io.debezium.connector.informix.TxLogPosition;
import io.debezium.data.Envelope;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InformixStreamingChangeEventSource
implements StreamingChangeEventSource<InformixPartition, InformixOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(InformixStreamingChangeEventSource.class);
    private static final String RECEIVED_GENERIC_RECORD = "Received {} ElapsedT [{}ms]";
    private static final String RECEIVED_UNKNOWN_RECORD_TYPE = "Received unknown record-type {} ElapsedT [{}ms]";
    private final InformixConnectorConfig connectorConfig;
    private final InformixConnection dataConnection;
    private final InformixConnection metadataConnection;
    private final EventDispatcher<InformixPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final InformixDatabaseSchema schema;
    private InformixOffsetContext effectiveOffsetContext;

    public InformixStreamingChangeEventSource(InformixConnectorConfig connectorConfig, InformixConnection dataConnection, InformixConnection metadataConnection, EventDispatcher<InformixPartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, InformixDatabaseSchema schema) {
        this.connectorConfig = connectorConfig;
        this.dataConnection = dataConnection;
        this.metadataConnection = metadataConnection;
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
    }

    public void init(InformixOffsetContext offsetContext) {
        this.effectiveOffsetContext = offsetContext == null ? new InformixOffsetContext(this.connectorConfig, TxLogPosition.current(), false, false) : offsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context, InformixPartition partition, InformixOffsetContext offsetContext) throws InterruptedException {
        if (!this.connectorConfig.getSnapshotMode().shouldStream()) {
            LOGGER.info("Streaming is not enabled in current configuration");
            return;
        }
        try {
            for (TableId tableId : this.schema.tableIds()) {
                Table table = this.metadataConnection.getTableSchemaFromTableId(tableId);
                this.schema.refresh(table);
            }
        }
        catch (SQLException e) {
            LOGGER.error("Caught SQLException", (Throwable)e);
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
        TxLogPosition lastPosition = offsetContext.getChangePosition();
        Lsn lastCommitLsn = lastPosition.getCommitLsn();
        Lsn lastBeginLsn = lastPosition.getBeginLsn();
        Lsn beginLsn = lastBeginLsn.isAvailable() ? lastBeginLsn : lastCommitLsn;
        try (InformixCdcTransactionEngine transactionEngine = this.getTransactionEngine(context, this.schema, beginLsn);){
            transactionEngine.init();
            if (beginLsn.compareTo(lastCommitLsn) < 0) {
                LOGGER.info("Begin recover: from lastBeginLsn='{}' to lastCommitLsn='{}'", (Object)lastBeginLsn, (Object)lastCommitLsn);
                boolean recovering = true;
                block23: while (context.isRunning() && recovering) {
                    if (context.isPaused()) {
                        LOGGER.info("Streaming will now pause");
                        context.streamingPaused();
                        context.waitSnapshotCompletion();
                        LOGGER.info("Streaming resumed");
                    }
                    this.dispatcher.dispatchHeartbeatEvent((Partition)partition, (OffsetContext)offsetContext);
                    IfmxStreamRecord streamRecord = transactionEngine.getRecord();
                    switch (streamRecord.getType()) {
                        case TRANSACTION_GROUP: {
                            InformixStreamTransactionRecord transactionRecord = (InformixStreamTransactionRecord)streamRecord;
                            Lsn commitLsn = Lsn.of(transactionRecord.getEndRecord().getSequenceId());
                            if (commitLsn.compareTo(lastCommitLsn) < 0) {
                                LOGGER.info("Skipping transaction with id: '{}' since commitLsn='{}' < lastCommitLsn='{}'", new Object[]{transactionRecord.getTransactionId(), commitLsn, lastCommitLsn});
                                continue block23;
                            }
                            if (commitLsn.compareTo(lastCommitLsn) > 0) {
                                LOGGER.info("Recover finished: from lastBeginLsn='{}' to lastCommitLsn='{}', current Lsn='{}'", new Object[]{lastBeginLsn, lastCommitLsn, commitLsn});
                                recovering = false;
                            }
                            this.handleTransaction(transactionEngine, partition, offsetContext, transactionRecord, recovering);
                            continue block23;
                        }
                        case METADATA: {
                            this.handleMetadata(partition, offsetContext, transactionEngine, (IfxCDCMetaDataRecord)streamRecord);
                            continue block23;
                        }
                        case TIMEOUT: {
                            LOGGER.trace(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)0);
                            continue block23;
                        }
                        case ERROR: {
                            LOGGER.error(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)0);
                            continue block23;
                        }
                    }
                    LOGGER.warn(RECEIVED_UNKNOWN_RECORD_TYPE, (Object)streamRecord, (Object)0);
                }
            }
            block24: while (context.isRunning()) {
                if (context.isPaused()) {
                    LOGGER.info("Streaming will now pause");
                    context.streamingPaused();
                    context.waitSnapshotCompletion();
                    LOGGER.info("Streaming resumed");
                }
                this.dispatcher.dispatchHeartbeatEvent((Partition)partition, (OffsetContext)offsetContext);
                IfmxStreamRecord streamRecord = transactionEngine.getRecord();
                switch (streamRecord.getType()) {
                    case TRANSACTION_GROUP: {
                        this.handleTransaction(transactionEngine, partition, offsetContext, (InformixStreamTransactionRecord)streamRecord, false);
                        continue block24;
                    }
                    case METADATA: {
                        this.handleMetadata(partition, offsetContext, transactionEngine, (IfxCDCMetaDataRecord)streamRecord);
                        continue block24;
                    }
                    case TIMEOUT: {
                        LOGGER.trace(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)0);
                        continue block24;
                    }
                    case ERROR: {
                        LOGGER.error(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)0);
                        continue block24;
                    }
                }
                LOGGER.warn(RECEIVED_UNKNOWN_RECORD_TYPE, (Object)streamRecord, (Object)0);
            }
        }
        catch (InterruptedException e) {
            LOGGER.error("Caught InterruptedException", (Throwable)e);
            this.errorHandler.setProducerThrowable((Throwable)e);
            Thread.currentThread().interrupt();
        }
        catch (Exception e) {
            LOGGER.error("Caught Exception", (Throwable)e);
            this.errorHandler.setProducerThrowable((Throwable)e);
        }
    }

    public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
    }

    public InformixOffsetContext getOffsetContext() {
        return this.effectiveOffsetContext;
    }

    public InformixCdcTransactionEngine getTransactionEngine(ChangeEventSource.ChangeEventSourceContext context, InformixDatabaseSchema schema, Lsn startLsn) throws SQLException {
        return new InformixCdcTransactionEngine(context, this.getCDCEngine(schema, startLsn));
    }

    private IfxCDCEngine getCDCEngine(InformixDatabaseSchema schema, Lsn startLsn) throws SQLException {
        IfxCDCEngine.Builder builder = IfxCDCEngine.builder((DataSource)new IfxDataSource(this.dataConnection.connectionString())).buffer(this.connectorConfig.getCdcBuffersize()).timeout(this.connectorConfig.getCdcTimeout());
        schema.tableIds().forEach(tid -> {
            String[] colNames = (String[])schema.tableFor((TableId)tid).retrieveColumnNames().toArray(String[]::new);
            builder.watchTable(tid.identifier(), colNames);
        });
        if (startLsn.isAvailable()) {
            builder.sequenceId(startLsn.sequence());
        }
        if (LOGGER.isInfoEnabled()) {
            long sequence = builder.getSequenceId();
            LOGGER.info("Set CDCEngine's LSN to '{}' aka {}", (Object)sequence, (Object)Lsn.of(sequence).toLongString());
        }
        return builder.build();
    }

    private void handleTransaction(InformixCdcTransactionEngine engine, InformixPartition partition, InformixOffsetContext offsetContext, InformixStreamTransactionRecord transactionRecord, boolean recover) throws InterruptedException, IfxStreamException {
        long tStart = System.nanoTime();
        int transactionId = transactionRecord.getTransactionId();
        IfxCDCBeginTransactionRecord beginRecord = transactionRecord.getBeginRecord();
        IfmxStreamRecord endRecord = transactionRecord.getEndRecord();
        long start = System.nanoTime();
        long beginTs = beginRecord.getTime();
        long beginSeq = beginRecord.getSequenceId();
        long lowestBeginSeq = engine.getLowestBeginSequence().orElse(beginSeq);
        long endSeq = endRecord.getSequenceId();
        if (!recover) {
            this.updateChangePosition(offsetContext, null, beginSeq, transactionId, lowestBeginSeq);
            this.dispatcher.dispatchTransactionStartedEvent((Partition)partition, String.valueOf(transactionId), (OffsetContext)offsetContext, Instant.ofEpochSecond(beginTs));
        }
        long end = System.nanoTime();
        LOGGER.debug("Received {} Time [{}] UserId [{}] ElapsedT [{}ms]", new Object[]{beginRecord, beginTs, beginRecord.getUserId(), (double)(end - start) / 1000000.0});
        if (IfmxStreamRecordType.COMMIT.equals((Object)endRecord.getType())) {
            IfxCDCCommitTransactionRecord commitRecord = (IfxCDCCommitTransactionRecord)endRecord;
            long commitSeq = commitRecord.getSequenceId();
            long commitTs = commitRecord.getTime();
            if (!recover) {
                this.updateChangePosition(offsetContext, commitSeq, null, transactionId, null);
            }
            Map before = null;
            Map<String, TableId> label2TableId = engine.getTableIdByLabelId();
            block8: for (IfmxStreamRecord streamRecord : transactionRecord.getRecords()) {
                start = System.nanoTime();
                long changeSeq = streamRecord.getSequenceId();
                if (recover && Lsn.of(changeSeq).compareTo(offsetContext.getChangePosition().getChangeLsn()) <= 0) {
                    LOGGER.info("Skipping already processed record {}", (Object)changeSeq);
                    continue;
                }
                Optional<TableId> tableId = Optional.ofNullable(streamRecord.getLabel()).map(label2TableId::get);
                this.updateChangePosition(offsetContext, null, changeSeq, transactionId, null);
                switch (streamRecord.getType()) {
                    case INSERT: {
                        Map after = ((IfmxStreamOperationRecord)streamRecord).getData();
                        this.handleOperation(partition, offsetContext, Envelope.Operation.CREATE, null, after, tableId.orElseThrow());
                        end = System.nanoTime();
                        LOGGER.debug("Received {} ElapsedT [{}ms] Data After [{}]", new Object[]{streamRecord, (double)(end - start) / 1000000.0, after});
                        continue block8;
                    }
                    case BEFORE_UPDATE: {
                        before = ((IfmxStreamOperationRecord)streamRecord).getData();
                        end = System.nanoTime();
                        LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}]", new Object[]{streamRecord, (double)(end - start) / 1000000.0, before});
                        continue block8;
                    }
                    case AFTER_UPDATE: {
                        Map after = ((IfmxStreamOperationRecord)streamRecord).getData();
                        this.handleOperation(partition, offsetContext, Envelope.Operation.UPDATE, before, after, tableId.orElseThrow());
                        end = System.nanoTime();
                        LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}] Data After [{}]", new Object[]{streamRecord, (double)(end - start) / 1000000.0, before, after});
                        continue block8;
                    }
                    case DELETE: {
                        before = ((IfmxStreamOperationRecord)streamRecord).getData();
                        this.handleOperation(partition, offsetContext, Envelope.Operation.DELETE, before, null, tableId.orElseThrow());
                        end = System.nanoTime();
                        LOGGER.debug("Received {} ElapsedT [{}ms] Data Before [{}]", new Object[]{streamRecord, (double)(end - start) / 1000000.0, before});
                        continue block8;
                    }
                    case TRUNCATE: {
                        IfxCDCTruncateRecord truncateRecord = (IfxCDCTruncateRecord)streamRecord;
                        tableId = Optional.of(truncateRecord.getUserId()).map(Object::toString).map(label2TableId::get);
                        this.handleOperation(partition, offsetContext, Envelope.Operation.TRUNCATE, null, null, tableId.orElseThrow());
                        LOGGER.debug(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)((double)(end - start) / 1000000.0));
                        continue block8;
                    }
                    case METADATA: 
                    case TIMEOUT: 
                    case ERROR: {
                        end = System.nanoTime();
                        LOGGER.debug(RECEIVED_GENERIC_RECORD, (Object)streamRecord, (Object)((double)(end - start) / 1000000.0));
                        continue block8;
                    }
                }
                end = System.nanoTime();
                LOGGER.debug(RECEIVED_UNKNOWN_RECORD_TYPE, (Object)streamRecord, (Object)((double)(end - start) / 1000000.0));
            }
            start = System.nanoTime();
            this.updateChangePosition(offsetContext, null, commitSeq, transactionId, null);
            this.dispatcher.dispatchTransactionCommittedEvent((Partition)partition, (OffsetContext)offsetContext, Instant.ofEpochSecond(commitTs));
            end = System.nanoTime();
            LOGGER.debug("Received {} Time [{}] UserId [{}] ElapsedT [{}ms]", new Object[]{endRecord, commitTs, beginRecord.getUserId(), (double)(end - start) / 1000000.0});
            LOGGER.debug("Handle Transaction Events [{}], ElapsedT [{}ms]", (Object)transactionRecord.getRecords().size(), (Object)((double)(end - tStart) / 1000000.0));
        }
        if (IfmxStreamRecordType.ROLLBACK.equals((Object)endRecord.getType())) {
            if (!recover) {
                this.updateChangePosition(offsetContext, endSeq, endSeq, transactionId, null);
                offsetContext.getTransactionContext().endTransaction();
            }
            end = System.nanoTime();
            LOGGER.debug(RECEIVED_GENERIC_RECORD, (Object)endRecord, (Object)((double)(end - start) / 1000000.0));
        }
    }

    private void handleMetadata(InformixPartition partition, InformixOffsetContext offsetContext, InformixCdcTransactionEngine engine, IfxCDCMetaDataRecord metaDataRecord) throws InterruptedException {
        long start = System.nanoTime();
        TableId tableId = engine.getTableIdByLabelId().get(metaDataRecord.getLabel());
        offsetContext.event((DataCollectionId)tableId, Instant.now());
        this.dispatcher.dispatchSchemaChangeEvent((Partition)partition, (OffsetContext)offsetContext, null, receiver -> {
            SchemaChangeEvent event = SchemaChangeEvent.ofAlter((Partition)partition, (OffsetContext)offsetContext, (String)tableId.catalog(), (String)tableId.schema(), (String)"n/a", (Table)this.schema.tableFor(tableId));
            if (!this.schema.skipSchemaChangeEvent(event)) {
                receiver.schemaChangeEvent(event);
            }
        });
        long end = System.nanoTime();
        LOGGER.debug(RECEIVED_GENERIC_RECORD, (Object)metaDataRecord, (Object)((double)(end - start) / 1000000.0));
    }

    private void updateChangePosition(InformixOffsetContext offsetContext, Long commitSeq, Long changeSeq, Integer transactionId, Long beginSeq) {
        offsetContext.setChangePosition(TxLogPosition.cloneAndSet(offsetContext.getChangePosition(), Lsn.of(commitSeq), Lsn.of(changeSeq), transactionId, Lsn.of(beginSeq)));
    }

    private void handleOperation(InformixPartition partition, InformixOffsetContext offsetContext, Envelope.Operation operation, Map<String, IfmxReadableType> before, Map<String, IfmxReadableType> after, TableId tableId) throws InterruptedException {
        offsetContext.event((DataCollectionId)tableId, this.clock.currentTime());
        this.dispatcher.dispatchDataChangeEvent((Partition)partition, (DataCollectionId)tableId, (ChangeRecordEmitter)new InformixChangeRecordEmitter(partition, offsetContext, operation, InformixChangeRecordEmitter.convertIfxData2Array(before, this.schema.schemaFor(tableId)), InformixChangeRecordEmitter.convertIfxData2Array(after, this.schema.schemaFor(tableId)), this.clock, this.connectorConfig));
    }
}

