/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.DisconnectEvent;
import io.debezium.connector.mongodb.MongoDbChangeRecordEmitter;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.PrimaryElectionEvent;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.ReplicaSetOffsetContext;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.data.Envelope;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Metronome;
import io.debezium.util.Threads;
import java.time.Duration;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbStreamingChangeEventSource
implements StreamingChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private static final String OPERATION_FIELD = "op";
    private static final String OBJECT_FIELD = "o";
    private static final String OPERATION_CONTROL = "c";
    private static final String TX_OPS = "applyOps";
    private final EventDispatcher<CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final MongoDbOffsetContext offsetContext;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, ReplicaSets replicaSets, MongoDbOffsetContext offsetContext, EventDispatcher<CollectionId> dispatcher, ErrorHandler errorHandler, Clock clock) {
        this.connectionContext = taskContext.getConnectionContext();
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = taskContext;
        this.offsetContext = offsetContext != null ? offsetContext : this.initializeOffsets(connectorConfig, replicaSets);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(ChangeEventSource.ChangeEventSourceContext context) throws InterruptedException {
        int threads = this.replicaSets.replicaSetCount();
        ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)"replicator-streaming", (int)threads);
        CountDownLatch latch = new CountDownLatch(threads);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", (Object)threads, (Object)this.replicaSets);
        this.replicaSets.validReplicaSets().forEach(replicaSet -> executor.submit(() -> {
            ConnectionContext.MongoPrimary primaryClient = null;
            try {
                primaryClient = this.establishConnectionToPrimary((ReplicaSet)replicaSet);
                if (primaryClient != null) {
                    AtomicReference<ConnectionContext.MongoPrimary> primaryReference = new AtomicReference<ConnectionContext.MongoPrimary>(primaryClient);
                    primaryClient.execute("read from oplog on '" + replicaSet + "'", primary -> this.readOplog((MongoClient)primary, (ConnectionContext.MongoPrimary)primaryReference.get(), (ReplicaSet)replicaSet, context));
                }
            }
            catch (Throwable t) {
                LOGGER.error("Streaming for replica set {} failed", (Object)replicaSet.replicaSetName(), (Object)t);
                this.errorHandler.setProducerThrowable(t);
            }
            finally {
                if (primaryClient != null) {
                    primaryClient.stop();
                }
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        try {
            executor.shutdown();
        }
        finally {
            this.taskContext.getConnectionContext().shutdown();
        }
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (desc, error) -> {
            if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + desc, error);
            }
            this.dispatcher.dispatchConnectorEvent((ConnectorEvent)new DisconnectEvent());
            LOGGER.error("Error while attempting to {}: {}", new Object[]{desc, error.getMessage(), error});
            throw new ConnectException("Error while attempting to " + desc, error);
        });
    }

    private void readOplog(MongoClient primary, ConnectionContext.MongoPrimary primaryClient, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext context) {
        ReplicaSetOffsetContext rsOffsetContext = this.offsetContext.getReplicaSetOffsetContext(replicaSet);
        BsonTimestamp oplogStart = rsOffsetContext.lastOffsetTimestamp();
        OptionalLong txOrder = rsOffsetContext.lastOffsetTxOrder();
        ServerAddress primaryAddress = primary.getAddress();
        LOGGER.info("Reading oplog for '{}' primary {} starting at {}", new Object[]{replicaSet, primaryAddress, oplogStart});
        MongoCollection oplog = primary.getDatabase("local").getCollection("oplog.rs");
        ReplicaSetOplogContext oplogContext = new ReplicaSetOplogContext(rsOffsetContext, primaryClient, replicaSet);
        Bson filter = null;
        if (!txOrder.isPresent()) {
            LOGGER.info("The last event processed was not transactional, resuming at the oplog event after '{}'", (Object)oplogStart);
            filter = Filters.and((Bson[])new Bson[]{Filters.gt((String)"ts", (Object)oplogStart), Filters.exists((String)"fromMigrate", (boolean)false)});
        } else {
            LOGGER.info("The last event processed was transactional, resuming at the oplog event '{}', expecting to skip '{}' events", (Object)oplogStart, (Object)txOrder.getAsLong());
            filter = Filters.and((Bson[])new Bson[]{Filters.gte((String)"ts", (Object)oplogStart), Filters.exists((String)"fromMigrate", (boolean)false)});
            oplogContext.setIncompleteEventTimestamp(oplogStart);
            oplogContext.setIncompleteTxOrder(txOrder.getAsLong());
        }
        Bson operationFilter = this.getSkippedOperationsFilter();
        if (operationFilter != null) {
            filter = Filters.and((Bson[])new Bson[]{filter, operationFilter});
        }
        FindIterable results = oplog.find(filter).sort((Bson)new Document("$natural", (Object)1)).oplogReplay(true).cursorType(CursorType.TailableAwait);
        try (MongoCursor cursor = results.iterator();){
            Metronome pause = Metronome.sleeper((Duration)Duration.ofMillis(500L), (Clock)this.clock);
            while (context.isRunning()) {
                Document event = (Document)cursor.tryNext();
                if (event != null) {
                    if (!this.handleOplogEvent(primaryAddress, event, event, 0L, oplogContext, context)) {
                        return;
                    }
                    try {
                        this.dispatcher.dispatchHeartbeatEvent((OffsetContext)oplogContext.getOffset());
                        continue;
                    }
                    catch (InterruptedException e) {
                        LOGGER.info("Replicator thread is interrupted");
                        Thread.currentThread().interrupt();
                        if (cursor != null) {
                            if (var15_15 != null) {
                                try {
                                    cursor.close();
                                }
                                catch (Throwable throwable) {
                                    var15_15.addSuppressed(throwable);
                                }
                            } else {
                                cursor.close();
                            }
                        }
                        return;
                    }
                }
                try {
                    pause.pause();
                }
                catch (InterruptedException e) {
                    break;
                }
            }
        }
    }

    private Bson getSkippedOperationsFilter() {
        Set skippedOperations = this.taskContext.getConnectorConfig().getSkippedOps();
        if (skippedOperations.isEmpty()) {
            return null;
        }
        Bson skippedOperationsFilter = null;
        for (Envelope.Operation operation : skippedOperations) {
            Bson skippedOperationFilter = Filters.ne((String)OPERATION_FIELD, (Object)operation.code());
            if (skippedOperationsFilter == null) {
                skippedOperationsFilter = skippedOperationFilter;
                continue;
            }
            skippedOperationsFilter = Filters.or((Bson[])new Bson[]{skippedOperationsFilter, skippedOperationFilter});
        }
        return skippedOperationsFilter;
    }

    private boolean handleOplogEvent(ServerAddress primaryAddress, Document event, Document masterEvent, long txOrder, ReplicaSetOplogContext oplogContext, ChangeEventSource.ChangeEventSourceContext context) {
        String ns = event.getString((Object)"ns");
        Document object = (Document)event.get((Object)OBJECT_FIELD, Document.class);
        if (Objects.isNull(object)) {
            if (LOGGER.isWarnEnabled()) {
                LOGGER.warn("Missing 'o' field in event, so skipping {}", (Object)event.toJson());
            }
            return true;
        }
        if (Objects.isNull(ns) || ns.isEmpty()) {
            String msg = object.getString((Object)"msg");
            if ("new primary".equals(msg)) {
                AtomicReference address = new AtomicReference();
                try {
                    oplogContext.getPrimary().executeBlocking("conn", (BlockingConsumer<MongoClient>)((BlockingConsumer)mongoClient -> {
                        ServerAddress currentPrimary = mongoClient.getAddress();
                        address.set(currentPrimary);
                    }));
                }
                catch (InterruptedException e) {
                    LOGGER.error("Get current primary executeBlocking", (Throwable)e);
                }
                ServerAddress serverAddress = (ServerAddress)address.get();
                if (Objects.nonNull(serverAddress) && !serverAddress.equals((Object)primaryAddress)) {
                    LOGGER.info("Found new primary event in oplog, so stopping use of {} to continue with new primary {}", (Object)primaryAddress, (Object)serverAddress);
                } else {
                    LOGGER.info("Found new primary event in oplog, current {} is new primary. Continue to process oplog event.", (Object)primaryAddress);
                }
                this.dispatcher.dispatchConnectorEvent((ConnectorEvent)new PrimaryElectionEvent(serverAddress));
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Skipping event with no namespace: {}", (Object)event.toJson());
            }
            return true;
        }
        List<Document> txChanges = this.transactionChanges(event);
        if (!txChanges.isEmpty()) {
            if (Objects.nonNull(oplogContext.getIncompleteEventTimestamp())) {
                if (oplogContext.getIncompleteEventTimestamp().equals((Object)SourceInfo.extractEventTimestamp(event))) {
                    for (Document change : txChanges) {
                        if (++txOrder <= oplogContext.getIncompleteTxOrder()) {
                            LOGGER.debug("Skipping record as it is expected to be already processed: {}", (Object)change);
                            continue;
                        }
                        boolean r = this.handleOplogEvent(primaryAddress, change, event, txOrder, oplogContext, context);
                        if (r) continue;
                        return false;
                    }
                }
                oplogContext.setIncompleteEventTimestamp(null);
                return true;
            }
            try {
                Long operationId = event.getLong((Object)"h");
                this.dispatcher.dispatchTransactionStartedEvent(Long.toString(operationId), (OffsetContext)oplogContext.getOffset());
                for (Document change : txChanges) {
                    boolean r = this.handleOplogEvent(primaryAddress, change, event, ++txOrder, oplogContext, context);
                    if (r) continue;
                    return false;
                }
                this.dispatcher.dispatchTransactionCommittedEvent((OffsetContext)oplogContext.getOffset());
            }
            catch (InterruptedException e) {
                LOGGER.error("Streaming transaction changes for replica set '{}' was interrupted", (Object)oplogContext.getReplicaSetName());
                throw new ConnectException("Streaming of transaction changes was interrupted for replica set " + oplogContext.getReplicaSetName(), (Throwable)e);
            }
            return true;
        }
        String operation = event.getString((Object)OPERATION_FIELD);
        if (!MongoDbChangeRecordEmitter.isValidOperation(operation)) {
            LOGGER.debug("Skipping event with \"op={}\"", (Object)operation);
            return true;
        }
        int delimIndex = ns.indexOf(46);
        if (delimIndex > 0) {
            assert (delimIndex + 1 < ns.length());
            String dbName = ns.substring(0, delimIndex);
            String collectionName = ns.substring(delimIndex + 1);
            if ("$cmd".equals(collectionName)) {
                LOGGER.debug("Skipping database command event: {}", (Object)event.toJson());
                return true;
            }
            if (!this.taskContext.filters().databaseFilter().test(dbName)) {
                LOGGER.debug("Skipping the event for database {} based on database.whitelist", (Object)dbName);
                return true;
            }
            oplogContext.getOffset().oplogEvent(event, masterEvent, txOrder);
            oplogContext.getOffset().getOffset();
            CollectionId collectionId = new CollectionId(oplogContext.getReplicaSetName(), dbName, collectionName);
            if (this.taskContext.filters().collectionFilter().test(collectionId)) {
                try {
                    return this.dispatcher.dispatchDataChangeEvent((DataCollectionId)collectionId, (ChangeRecordEmitter)new MongoDbChangeRecordEmitter(oplogContext.getOffset(), this.clock, event, false));
                }
                catch (Exception e) {
                    this.errorHandler.setProducerThrowable((Throwable)e);
                    return false;
                }
            }
        }
        return true;
    }

    private List<Document> transactionChanges(Document event) {
        String op = event.getString((Object)OPERATION_FIELD);
        Document o = (Document)event.get((Object)OBJECT_FIELD, Document.class);
        if (!(OPERATION_CONTROL.equals(op) && Objects.nonNull(o) && o.containsKey((Object)TX_OPS))) {
            return Collections.emptyList();
        }
        return (List)o.get((Object)TX_OPS, List.class);
    }

    protected MongoDbOffsetContext initializeOffsets(MongoDbConnectorConfig connectorConfig, ReplicaSets replicaSets) {
        LinkedHashMap<ReplicaSet, Document> positions = new LinkedHashMap<ReplicaSet, Document>();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", (Object)replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary primaryClient = this.establishConnectionToPrimary((ReplicaSet)replicaSet);
            if (primaryClient != null) {
                try {
                    primaryClient.execute("get oplog position", primary -> {
                        MongoCollection oplog = primary.getDatabase("local").getCollection("oplog.rs");
                        Document last = (Document)oplog.find().sort((Bson)new Document("$natural", (Object)-1)).limit(1).first();
                        positions.put((ReplicaSet)replicaSet, last);
                    });
                }
                finally {
                    LOGGER.info("Stopping primary client");
                    primaryClient.stop();
                }
            }
        });
        return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), positions);
    }

    private class ReplicaSetOplogContext {
        private final ReplicaSetOffsetContext offset;
        private final ConnectionContext.MongoPrimary primary;
        private final ReplicaSet replicaSet;
        private BsonTimestamp incompleteEventTimestamp;
        private long incompleteTxOrder = 0L;

        ReplicaSetOplogContext(ReplicaSetOffsetContext offsetContext, ConnectionContext.MongoPrimary primary, ReplicaSet replicaSet) {
            this.offset = offsetContext;
            this.primary = primary;
            this.replicaSet = replicaSet;
        }

        ReplicaSetOffsetContext getOffset() {
            return this.offset;
        }

        ConnectionContext.MongoPrimary getPrimary() {
            return this.primary;
        }

        String getReplicaSetName() {
            return this.replicaSet.replicaSetName();
        }

        BsonTimestamp getIncompleteEventTimestamp() {
            return this.incompleteEventTimestamp;
        }

        public void setIncompleteEventTimestamp(BsonTimestamp incompleteEventTimestamp) {
            this.incompleteEventTimestamp = incompleteEventTimestamp;
        }

        public long getIncompleteTxOrder() {
            return this.incompleteTxOrder;
        }

        public void setIncompleteTxOrder(long incompleteTxOrder) {
            this.incompleteTxOrder = incompleteTxOrder;
        }
    }
}

