package com.ververica.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.changestream.OperationType;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.relational.TableId;
import java.util.ArrayList;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.RawBsonDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBScanFetchTask.class */
public class MongoDBScanFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBScanFetchTask.class);
    private final SnapshotSplit snapshotSplit;
    private volatile boolean taskRunning = false;

    public MongoDBScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MongoDBFetchTaskContext mongoDBFetchTaskContext = (MongoDBFetchTaskContext) context;
        MongoDBSourceConfig sourceConfig = mongoDBFetchTaskContext.getSourceConfig();
        MongoDBDialect dialect = mongoDBFetchTaskContext.getDialect();
        ChangeEventQueue<DataChangeEvent> queue = mongoDBFetchTaskContext.getQueue();
        this.taskRunning = true;
        TableId tableId = this.snapshotSplit.getTableId();
        ChangeStreamOffset displayCurrentOffset = dialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", displayCurrentOffset, this.snapshotSplit);
        queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(tableId.identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, this.snapshotSplit.splitId(), WatermarkKind.LOW, displayCurrentOffset)));
        LOG.info("Snapshot step 2 - Snapshotting data");
        MongoCursor mongoCursor = null;
        try {
            try {
                MongoCursor cursor = MongoUtils.collectionFor(MongoUtils.clientFor(sourceConfig), tableId, RawBsonDocument.class).find().min((BsonDocument) this.snapshotSplit.getSplitStart()[1]).max((BsonDocument) this.snapshotSplit.getSplitEnd()[1]).hint((BsonDocument) this.snapshotSplit.getSplitStart()[0]).batchSize(sourceConfig.getBatchSize()).noCursorTimeout(true).cursor();
                while (cursor.hasNext()) {
                    if (!this.taskRunning) {
                        throw new InterruptedException("Interrupted while snapshotting collection " + tableId.identifier());
                    }
                    BsonDocument normalizeSnapshotDocument = normalizeSnapshotDocument(tableId, (BsonDocument) cursor.next());
                    BsonDocument bsonDocument = new BsonDocument(MongoDBEnvelope.ID_FIELD, normalizeSnapshotDocument.get(MongoDBEnvelope.ID_FIELD));
                    queue.enqueue(new DataChangeEvent(MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(sourceConfig.getScheme(), sourceConfig.getHosts(), tableId.catalog(), tableId.table()), MongoRecordUtils.createSourceOffsetMap(bsonDocument.getDocument(MongoDBEnvelope.ID_FIELD), true), tableId.identifier(), bsonDocument, normalizeSnapshotDocument)));
                }
                ChangeStreamOffset displayCurrentOffset2 = dialect.displayCurrentOffset(sourceConfig);
                LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", displayCurrentOffset2, this.snapshotSplit);
                queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(tableId.identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, this.snapshotSplit.splitId(), WatermarkKind.HIGH, displayCurrentOffset2)));
                LOG.info("Snapshot step 4 - Back fill stream split for snapshot split {}", this.snapshotSplit);
                StreamSplit createBackfillStreamSplit = createBackfillStreamSplit(displayCurrentOffset, displayCurrentOffset2);
                if (createBackfillStreamSplit.getEndingOffset().isAfter(createBackfillStreamSplit.getStartingOffset())) {
                    new MongoDBStreamFetchTask(createBackfillStreamSplit).execute(mongoDBFetchTaskContext);
                } else {
                    queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(tableId.identifier()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, createBackfillStreamSplit.splitId(), WatermarkKind.END, createBackfillStreamSplit.getEndingOffset())));
                }
                this.taskRunning = false;
                if (cursor != null) {
                    cursor.close();
                }
            } catch (Exception e) {
                this.taskRunning = false;
                LOG.error(String.format("Execute snapshot read subtask for mongo split %s fail", this.snapshotSplit), e);
                throw e;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                mongoCursor.close();
            }
            throw th;
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public SnapshotSplit m20getSplit() {
        return this.snapshotSplit;
    }

    public void close() {
    }

    private StreamSplit createBackfillStreamSplit(ChangeStreamOffset changeStreamOffset, ChangeStreamOffset changeStreamOffset2) {
        return new StreamSplit(this.snapshotSplit.splitId(), changeStreamOffset, changeStreamOffset2, new ArrayList(), this.snapshotSplit.getTableSchemas(), 0);
    }

    private BsonDocument normalizeSnapshotDocument(TableId tableId, BsonDocument bsonDocument) {
        BsonDocument bsonDocument2 = new BsonDocument();
        BsonDocument bsonDocument3 = new BsonDocument();
        bsonDocument3.put(MongoDBEnvelope.ID_FIELD, bsonDocument.get(MongoDBEnvelope.ID_FIELD));
        bsonDocument2.put(MongoDBEnvelope.ID_FIELD, bsonDocument3);
        bsonDocument2.put(MongoDBEnvelope.OPERATION_TYPE_FIELD, new BsonString(OperationType.INSERT.getValue()));
        BsonDocument bsonDocument4 = new BsonDocument();
        bsonDocument4.put(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD, new BsonString(tableId.catalog()));
        bsonDocument4.put(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD, new BsonString(tableId.table()));
        bsonDocument2.put(MongoDBEnvelope.NAMESPACE_FIELD, bsonDocument4);
        bsonDocument2.put(MongoDBEnvelope.DOCUMENT_KEY_FIELD, new BsonDocument(MongoDBEnvelope.ID_FIELD, bsonDocument.get(MongoDBEnvelope.ID_FIELD)));
        bsonDocument2.put(MongoDBEnvelope.FULL_DOCUMENT_FIELD, bsonDocument);
        bsonDocument2.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(System.currentTimeMillis()));
        BsonDocument bsonDocument5 = new BsonDocument();
        bsonDocument5.put(MongoDBEnvelope.SNAPSHOT_KEY_FIELD, new BsonString("true"));
        bsonDocument5.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(0L));
        bsonDocument2.put(MongoDBEnvelope.SOURCE_FIELD, bsonDocument5);
        return bsonDocument2;
    }
}
