package com.ververica.cdc.connectors.mongodb.source.reader.fetch;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.OperationType;
import com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import com.ververica.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoRecordUtils;
import com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask.class */
public class MongoDBStreamFetchTask implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBStreamFetchTask.class);
    private final StreamSplit streamSplit;
    private MongoDBSourceConfig sourceConfig;
    private volatile boolean taskRunning = false;
    private final Time time = new SystemTime();
    private boolean supportsStartAtOperationTime = true;
    private boolean supportsStartAfter = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.ververica.cdc.connectors.mongodb.source.reader.fetch.MongoDBStreamFetchTask$1, reason: invalid class name */
    /* loaded from: input_file:com/ververica/cdc/connectors/mongodb/source/reader/fetch/MongoDBStreamFetchTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$mongodb$client$model$changestream$OperationType = new int[OperationType.values().length];

        static {
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.UPDATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.REPLACE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$mongodb$client$model$changestream$OperationType[OperationType.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    public MongoDBStreamFetchTask(StreamSplit streamSplit) {
        this.streamSplit = streamSplit;
    }

    public void execute(FetchTask.Context context) throws Exception {
        MongoDBFetchTaskContext mongoDBFetchTaskContext = (MongoDBFetchTaskContext) context;
        this.sourceConfig = mongoDBFetchTaskContext.getSourceConfig();
        ChangeStreamDescriptor changeStreamDescriptor = mongoDBFetchTaskContext.getChangeStreamDescriptor();
        ChangeEventQueue<DataChangeEvent> queue = mongoDBFetchTaskContext.getQueue();
        MongoClient clientFor = MongoUtils.clientFor(this.sourceConfig);
        MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor = openChangeStreamCursor(changeStreamDescriptor);
        HeartbeatManager openHeartbeatManagerIfNeeded = openHeartbeatManagerIfNeeded(openChangeStreamCursor);
        long milliseconds = this.time.milliseconds() + this.sourceConfig.getPollAwaitTimeMillis();
        this.taskRunning = true;
        while (true) {
            try {
                try {
                    if (this.taskRunning) {
                        Optional ofNullable = Optional.ofNullable(openChangeStreamCursor.tryNext());
                        SourceRecord sourceRecord = null;
                        if (ofNullable.isPresent()) {
                            BsonDocument bsonDocument = (BsonDocument) ofNullable.get();
                            OperationType operationType = getOperationType(bsonDocument);
                            switch (AnonymousClass1.$SwitchMap$com$mongodb$client$model$changestream$OperationType[operationType.ordinal()]) {
                                case 1:
                                case 2:
                                case 3:
                                case 4:
                                    MongoNamespace mongoNamespace = getMongoNamespace(bsonDocument);
                                    BsonDocument document = bsonDocument.getDocument(MongoDBEnvelope.ID_FIELD);
                                    BsonDocument normalizeChangeStreamDocument = normalizeChangeStreamDocument(bsonDocument);
                                    LOG.trace("Adding {} to {}", normalizeChangeStreamDocument, mongoNamespace.getFullName());
                                    sourceRecord = MongoRecordUtils.createSourceRecord(MongoRecordUtils.createPartitionMap(this.sourceConfig.getScheme(), this.sourceConfig.getHosts(), mongoNamespace.getDatabaseName(), mongoNamespace.getCollectionName()), MongoRecordUtils.createSourceOffsetMap(document, false), mongoNamespace.getFullName(), bsonDocument.getDocument(MongoDBEnvelope.ID_FIELD), normalizeChangeStreamDocument);
                                    break;
                                default:
                                    LOG.info("Ignored {} record: {}", operationType, bsonDocument);
                                    break;
                            }
                        } else {
                            long milliseconds2 = milliseconds - this.time.milliseconds();
                            if (milliseconds2 > 0) {
                                LOG.debug("Waiting {} ms to poll change records", Long.valueOf(milliseconds2));
                                this.time.sleep(milliseconds2);
                            } else {
                                if (openHeartbeatManagerIfNeeded != null) {
                                    sourceRecord = (SourceRecord) openHeartbeatManagerIfNeeded.heartbeat().map(this::normalizeHeartbeatRecord).orElse(null);
                                }
                                milliseconds = this.time.milliseconds() + this.sourceConfig.getPollAwaitTimeMillis();
                            }
                        }
                        if (sourceRecord != null) {
                            queue.enqueue(new DataChangeEvent(sourceRecord));
                        }
                        if (isBoundedRead()) {
                            ChangeStreamOffset changeStreamOffset = sourceRecord != null ? new ChangeStreamOffset(MongoRecordUtils.getResumeToken(sourceRecord)) : new ChangeStreamOffset(MongoUtils.getCurrentClusterTime(clientFor));
                            if (changeStreamOffset.isAtOrAfter(this.streamSplit.getEndingOffset())) {
                                queue.enqueue(new DataChangeEvent(WatermarkEvent.create(MongoRecordUtils.createWatermarkPartitionMap(changeStreamDescriptor.toString()), MongoDBEnvelope.WATERMARK_TOPIC_NAME, this.streamSplit.splitId(), WatermarkKind.END, changeStreamOffset)));
                            }
                        }
                    }
                } catch (Exception e) {
                    LOG.error("Poll change stream records failed ", e);
                    throw e;
                }
            } finally {
                this.taskRunning = false;
                if (openChangeStreamCursor != null) {
                    openChangeStreamCursor.close();
                }
            }
        }
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    /* renamed from: getSplit, reason: merged with bridge method [inline-methods] */
    public StreamSplit m22getSplit() {
        return this.streamSplit;
    }

    public void close() {
    }

    private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(ChangeStreamDescriptor changeStreamDescriptor) {
        ChangeStreamOffset changeStreamOffset = new ChangeStreamOffset((Map<String, String>) this.streamSplit.getStartingOffset().getOffset());
        ChangeStreamIterable<Document> changeStreamIterable = MongoUtils.getChangeStreamIterable(this.sourceConfig, changeStreamDescriptor);
        BsonDocument resumeToken = changeStreamOffset.getResumeToken();
        BsonTimestamp timestamp = changeStreamOffset.getTimestamp();
        if (resumeToken != null) {
            if (this.supportsStartAfter) {
                LOG.info("Open the change stream after the previous offset: {}", resumeToken);
                changeStreamIterable.startAfter(resumeToken);
            } else {
                LOG.info("Open the change stream after the previous offset using resumeAfter: {}", resumeToken);
                changeStreamIterable.resumeAfter(resumeToken);
            }
        } else if (this.supportsStartAtOperationTime) {
            LOG.info("Open the change stream at the timestamp: {}", timestamp);
            changeStreamIterable.startAtOperationTime(timestamp);
        } else {
            LOG.warn("Open the change stream of the latest offset");
        }
        try {
            return changeStreamIterable.withDocumentClass(BsonDocument.class).cursor();
        } catch (MongoCommandException e) {
            if (e.getErrorCode() == 9 || e.getErrorCode() == 40415) {
                if (e.getErrorMessage().contains("startAtOperationTime")) {
                    this.supportsStartAtOperationTime = false;
                    return openChangeStreamCursor(changeStreamDescriptor);
                }
                if (e.getErrorMessage().contains("startAfter")) {
                    this.supportsStartAfter = false;
                    return openChangeStreamCursor(changeStreamDescriptor);
                }
                LOG.error("Open change stream failed ", e);
                throw new FlinkRuntimeException("Open change stream failed", e);
            }
            if (e.getErrorCode() == 20) {
                LOG.error("Illegal $changeStream operation: {} {}", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
                throw new FlinkRuntimeException("Illegal $changeStream operation", e);
            }
            if (e.getErrorCode() == 13) {
                LOG.error("Unauthorized $changeStream operation: {} {}", e.getErrorMessage(), Integer.valueOf(e.getErrorCode()));
                throw new FlinkRuntimeException("Unauthorized $changeStream operation", e);
            }
            LOG.error("Open change stream failed ", e);
            throw new FlinkRuntimeException("Open change stream failed", e);
        }
    }

    private HeartbeatManager openHeartbeatManagerIfNeeded(MongoChangeStreamCursor<BsonDocument> mongoChangeStreamCursor) {
        if (this.sourceConfig.getHeartbeatIntervalMillis() > 0) {
            return new HeartbeatManager(this.time, mongoChangeStreamCursor, this.sourceConfig.getHeartbeatIntervalMillis(), MongoDBEnvelope.HEARTBEAT_TOPIC_NAME, MongoRecordUtils.createHeartbeatPartitionMap(this.sourceConfig.getScheme(), this.sourceConfig.getHosts()));
        }
        return null;
    }

    private BsonDocument normalizeChangeStreamDocument(BsonDocument bsonDocument) {
        bsonDocument.put(MongoDBEnvelope.ID_FIELD, normalizeKeyDocument(bsonDocument));
        bsonDocument.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(System.currentTimeMillis()));
        BsonDocument bsonDocument2 = new BsonDocument();
        bsonDocument2.put(MongoDBEnvelope.SNAPSHOT_KEY_FIELD, new BsonString("false"));
        if (!bsonDocument.containsKey(MongoDBEnvelope.CLUSTER_TIME_FIELD)) {
            if (bsonDocument.containsKey(MongoDBEnvelope.TIMESTAMP_KEY_FIELD)) {
                bsonDocument.put(MongoDBEnvelope.CLUSTER_TIME_FIELD, MongoRecordUtils.bsonTimestampFromEpochMillis(bsonDocument.getInt64(MongoDBEnvelope.TIMESTAMP_KEY_FIELD).getValue()));
            } else {
                LOG.warn("Cannot extract clusterTime from change stream event, fallback to current timestamp.");
                bsonDocument.put(MongoDBEnvelope.CLUSTER_TIME_FIELD, MongoRecordUtils.currentBsonTimestamp());
            }
        }
        bsonDocument2.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, new BsonInt64(Instant.ofEpochSecond(bsonDocument.getTimestamp(MongoDBEnvelope.CLUSTER_TIME_FIELD).getTime()).toEpochMilli()));
        bsonDocument.put(MongoDBEnvelope.SOURCE_FIELD, bsonDocument2);
        return bsonDocument;
    }

    private BsonDocument normalizeKeyDocument(BsonDocument bsonDocument) {
        return new BsonDocument(MongoDBEnvelope.ID_FIELD, new BsonDocument(MongoDBEnvelope.ID_FIELD, bsonDocument.getDocument(MongoDBEnvelope.DOCUMENT_KEY_FIELD).get(MongoDBEnvelope.ID_FIELD)));
    }

    private SourceRecord normalizeHeartbeatRecord(SourceRecord sourceRecord) {
        Struct struct = new Struct(MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA);
        struct.put(MongoDBEnvelope.TIMESTAMP_KEY_FIELD, Long.valueOf(Instant.now().toEpochMilli()));
        return new SourceRecord(sourceRecord.sourcePartition(), sourceRecord.sourceOffset(), sourceRecord.topic(), sourceRecord.keySchema(), sourceRecord.key(), MongoDBEnvelope.HEARTBEAT_VALUE_SCHEMA, struct);
    }

    private MongoNamespace getMongoNamespace(BsonDocument bsonDocument) {
        BsonDocument document = bsonDocument.getDocument(MongoDBEnvelope.NAMESPACE_FIELD);
        return new MongoNamespace(document.getString(MongoDBEnvelope.NAMESPACE_DATABASE_FIELD).getValue(), document.getString(MongoDBEnvelope.NAMESPACE_COLLECTION_FIELD).getValue());
    }

    private OperationType getOperationType(BsonDocument bsonDocument) {
        return OperationType.fromString(bsonDocument.getString(MongoDBEnvelope.OPERATION_TYPE_FIELD).getValue());
    }

    private boolean isBoundedRead() {
        return !ChangeStreamOffset.NO_STOPPING_OFFSET.equals(this.streamSplit.getEndingOffset());
    }
}
