/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.MongoDbChangeRecordEmitter;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.ReplicaSetOffsetContext;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.function.BlockingConsumer;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
import io.debezium.util.Threads;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.connect.errors.ConnectException;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbSnapshotChangeEventSource
extends AbstractSnapshotChangeEventSource {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbSnapshotChangeEventSource.class);
    private static final String AUTHORIZATION_FAILURE_MESSAGE = "Command failed with error 13";
    private final MongoDbConnectorConfig connectorConfig;
    private final MongoDbTaskContext taskContext;
    private final MongoDbOffsetContext previousOffset;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final EventDispatcher<CollectionId> dispatcher;
    protected final Clock clock;
    private final SnapshotProgressListener snapshotProgressListener;
    private final ErrorHandler errorHandler;
    private AtomicBoolean aborted = new AtomicBoolean(false);

    public MongoDbSnapshotChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, ReplicaSets replicaSets, MongoDbOffsetContext previousOffset, EventDispatcher<CollectionId> dispatcher, Clock clock, SnapshotProgressListener snapshotProgressListener, ErrorHandler errorHandler) {
        super((CommonConnectorConfig)connectorConfig, (OffsetContext)previousOffset, snapshotProgressListener);
        this.connectorConfig = connectorConfig;
        this.taskContext = taskContext;
        this.connectionContext = taskContext.getConnectionContext();
        this.previousOffset = previousOffset;
        this.replicaSets = replicaSets;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.snapshotProgressListener = snapshotProgressListener;
        this.errorHandler = errorHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected SnapshotResult doExecute(ChangeEventSource.ChangeEventSourceContext context, AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, AbstractSnapshotChangeEventSource.SnapshottingTask snapshottingTask) throws Exception {
        MongoDbSnapshottingTask mongoDbSnapshottingTask = (MongoDbSnapshottingTask)snapshottingTask;
        MongoDbSnapshotContext mongoDbSnapshotContext = (MongoDbSnapshotContext)snapshotContext;
        LOGGER.info("Snapshot step 1 - Preparing");
        this.snapshotProgressListener.snapshotStarted();
        if (this.previousOffset != null && this.previousOffset.isSnapshotRunning()) {
            LOGGER.info("Previous snapshot was cancelled before completion; a new snapshot will be taken.");
        }
        LOGGER.info("Snapshot step 2 - Determining snapshot offsets");
        this.determineSnapshotOffsets(mongoDbSnapshotContext, this.replicaSets);
        List<ReplicaSet> replicaSetsToSnapshot = mongoDbSnapshottingTask.getReplicaSetsToSnapshot();
        int threads = replicaSetsToSnapshot.size();
        ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)"replicator-snapshot", (int)threads);
        CountDownLatch latch = new CountDownLatch(threads);
        LOGGER.info("Ignoring unnamed replica sets: {}", this.replicaSets.unnamedReplicaSets());
        LOGGER.info("Starting {} thread(s) to snapshot replica sets: {}", (Object)threads, replicaSetsToSnapshot);
        LOGGER.info("Snapshot step 3 - Snapshotting data");
        replicaSetsToSnapshot.forEach(replicaSet -> executor.submit(() -> {
            try {
                this.taskContext.configureLoggingContext(replicaSet.replicaSetName());
                try {
                    this.snapshotReplicaSet(context, mongoDbSnapshotContext, (ReplicaSet)replicaSet);
                }
                finally {
                    MongoDbOffsetContext mongoDbOffsetContext = (MongoDbOffsetContext)snapshotContext.offset;
                }
            }
            catch (Throwable t) {
                LOGGER.error("Snapshot for replica set {} failed", (Object)replicaSet.replicaSetName(), (Object)t);
                this.errorHandler.setProducerThrowable(t);
            }
            finally {
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.aborted.set(true);
        }
        try {
            executor.shutdown();
        }
        finally {
            LOGGER.info("Stopping mongodb connections");
            this.taskContext.getConnectionContext().shutdown();
        }
        if (this.aborted.get()) {
            return SnapshotResult.aborted();
        }
        this.snapshotProgressListener.snapshotCompleted();
        return SnapshotResult.completed((OffsetContext)snapshotContext.offset);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected AbstractSnapshotChangeEventSource.SnapshottingTask getSnapshottingTask(OffsetContext previousOffset) {
        if (previousOffset == null) {
            LOGGER.info("No previous offset has been found");
            if (this.connectorConfig.getSnapshotMode().equals((Object)MongoDbConnectorConfig.SnapshotMode.NEVER)) {
                LOGGER.info("According to the connector configuration, no snapshot will occur.");
                return new MongoDbSnapshottingTask(Collections.emptyList());
            }
            return new MongoDbSnapshottingTask(this.replicaSets.all());
        }
        if (this.connectorConfig.getSnapshotMode().equals((Object)MongoDbConnectorConfig.SnapshotMode.NEVER)) {
            LOGGER.info("According to the connector configuration, no snapshot will occur.");
            return new MongoDbSnapshottingTask(Collections.emptyList());
        }
        ArrayList<ReplicaSet> replicaSetSnapshots = new ArrayList<ReplicaSet>();
        MongoDbOffsetContext offsetContext = (MongoDbOffsetContext)previousOffset;
        try {
            this.replicaSets.onEachReplicaSet(replicaSet -> {
                ConnectionContext.MongoPrimary primary = null;
                try {
                    primary = this.establishConnectionToPrimary((ReplicaSet)replicaSet);
                    ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext((ReplicaSet)replicaSet);
                    if (primary != null && this.isSnapshotExpected(primary, rsOffsetContext)) {
                        replicaSetSnapshots.add((ReplicaSet)replicaSet);
                    }
                }
                finally {
                    if (primary != null) {
                        primary.stop();
                    }
                }
            });
        }
        finally {
            this.taskContext.getConnectionContext().shutdown();
        }
        return new MongoDbSnapshottingTask(replicaSetSnapshots);
    }

    protected AbstractSnapshotChangeEventSource.SnapshotContext prepare(ChangeEventSource.ChangeEventSourceContext sourceContext) throws Exception {
        return new MongoDbSnapshotContext();
    }

    protected void complete(AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext) {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void snapshotReplicaSet(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext ctx, ReplicaSet replicaSet) throws InterruptedException {
        ConnectionContext.MongoPrimary primaryClient = null;
        try {
            primaryClient = this.establishConnectionToPrimary(replicaSet);
            if (primaryClient != null) {
                this.createDataEvents(sourceContext, ctx, replicaSet, primaryClient);
            }
        }
        finally {
            if (primaryClient != null) {
                primaryClient.stop();
            }
        }
    }

    private ConnectionContext.MongoPrimary establishConnectionToPrimary(ReplicaSet replicaSet) {
        return this.connectionContext.primaryFor(replicaSet, this.taskContext.filters(), (desc, error) -> {
            if (error.getMessage() != null && error.getMessage().startsWith(AUTHORIZATION_FAILURE_MESSAGE)) {
                throw new ConnectException("Error while attempting to " + desc, error);
            }
            LOGGER.error("Error while attempting to {}: ", new Object[]{desc, error.getMessage(), error});
            throw new ConnectException("Error while attempting to " + desc, error);
        });
    }

    private boolean isSnapshotExpected(ConnectionContext.MongoPrimary primaryClient, ReplicaSetOffsetContext offsetContext) {
        boolean performSnapshot = true;
        if (offsetContext.hasOffset()) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("Found existing offset for replica set '{}' at {}", (Object)offsetContext.getReplicaSetName(), offsetContext.getOffset());
            }
            performSnapshot = false;
            if (offsetContext.isSnapshotOngoing()) {
                LOGGER.info("The previous snapshot was incomplete for '{}', so restarting the snapshot", (Object)offsetContext.getReplicaSetName());
                performSnapshot = true;
            } else {
                BsonTimestamp lastRecordedTs = offsetContext.lastOffsetTimestamp();
                BsonTimestamp firstAvailableTs = primaryClient.execute("get oplog position", primary -> {
                    MongoCollection oplog = primary.getDatabase("local").getCollection("oplog.rs");
                    Document firstEvent = (Document)oplog.find().sort((Bson)new Document("$natural", (Object)1)).limit(1).first();
                    return SourceInfo.extractEventTimestamp(firstEvent);
                });
                if (firstAvailableTs == null) {
                    LOGGER.info("The oplog contains no entries, so performing snapshot of replica set '{}'", (Object)offsetContext.getReplicaSetName());
                    performSnapshot = true;
                } else if (lastRecordedTs.compareTo(firstAvailableTs) < 0) {
                    LOGGER.info("Snapshot is required since the oplog for replica set '{}' starts at {}, which is later than the timestamp of the last offset {}", new Object[]{offsetContext.getReplicaSetName(), firstAvailableTs, lastRecordedTs});
                    performSnapshot = true;
                } else {
                    LOGGER.info("The oplog contains the last entry previously read for '{}', so no snapshot will be performed", (Object)offsetContext.getReplicaSetName());
                }
            }
        } else {
            LOGGER.info("No existing offset found for replica set '{}', starting snapshot", (Object)offsetContext.getReplicaSetName());
            performSnapshot = true;
        }
        return performSnapshot;
    }

    protected void determineSnapshotOffsets(MongoDbSnapshotContext ctx, ReplicaSets replicaSets) {
        LinkedHashMap<ReplicaSet, Document> positions = new LinkedHashMap<ReplicaSet, Document>();
        replicaSets.onEachReplicaSet(replicaSet -> {
            LOGGER.info("Determine Snapshot Offset for replica-set {}", (Object)replicaSet.replicaSetName());
            ConnectionContext.MongoPrimary primaryClient = this.establishConnectionToPrimary((ReplicaSet)replicaSet);
            if (primaryClient != null) {
                try {
                    primaryClient.execute("get oplog position", primary -> {
                        MongoCollection oplog = primary.getDatabase("local").getCollection("oplog.rs");
                        Document last = (Document)oplog.find().sort((Bson)new Document("$natural", (Object)-1)).limit(1).first();
                        positions.put((ReplicaSet)replicaSet, last);
                    });
                }
                finally {
                    LOGGER.info("Stopping primary client");
                    primaryClient.stop();
                }
            }
        });
        ctx.offset = new MongoDbOffsetContext(new SourceInfo(this.connectorConfig), new TransactionContext(), positions);
    }

    private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, ReplicaSet replicaSet, ConnectionContext.MongoPrimary primaryClient) throws InterruptedException {
        EventDispatcher.SnapshotReceiver snapshotReceiver = this.dispatcher.getSnapshotChangeEventReceiver();
        snapshotContext.offset.preSnapshotStart();
        this.createDataEventsForReplicaSet(sourceContext, snapshotContext, snapshotReceiver, replicaSet, primaryClient);
        snapshotContext.offset.preSnapshotCompletion();
        snapshotReceiver.completeSnapshot();
        snapshotContext.offset.postSnapshotCompletion();
    }

    private void createDataEventsForReplicaSet(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, ReplicaSet replicaSet, ConnectionContext.MongoPrimary primaryClient) throws InterruptedException {
        String rsName = replicaSet.replicaSetName();
        MongoDbOffsetContext offsetContext = (MongoDbOffsetContext)snapshotContext.offset;
        ReplicaSetOffsetContext rsOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        snapshotContext.lastCollection = false;
        offsetContext.startReplicaSetSnapshot(replicaSet.replicaSetName());
        LOGGER.info("Beginning snapshot of '{}' at {}", (Object)rsName, rsOffsetContext.getOffset());
        List<CollectionId> collections = primaryClient.collections();
        this.snapshotProgressListener.monitoredDataCollectionsDetermined(collections);
        if (this.connectionContext.maxNumberOfCopyThreads() > 1) {
            int numThreads = Math.min(collections.size(), this.connectionContext.maxNumberOfCopyThreads());
            ConcurrentLinkedQueue<CollectionId> collectionsToCopy = new ConcurrentLinkedQueue<CollectionId>(collections);
            String copyThreadName = "copy-" + (replicaSet.hasReplicaSetName() ? replicaSet.replicaSetName() : "main");
            ExecutorService copyThreads = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)copyThreadName, (int)this.connectionContext.maxNumberOfCopyThreads());
            CountDownLatch latch = new CountDownLatch(numThreads);
            AtomicBoolean aborted = new AtomicBoolean(false);
            AtomicInteger threadCounter = new AtomicInteger(0);
            LOGGER.info("Preparing to use {} thread(s) to snapshot {} collection(s): {}", new Object[]{numThreads, collections.size(), Strings.join((CharSequence)", ", collections)});
            for (int i = 0; i < numThreads; ++i) {
                copyThreads.submit(() -> {
                    this.taskContext.configureLoggingContext(replicaSet.replicaSetName() + "-sync" + threadCounter.incrementAndGet());
                    try {
                        CollectionId id = null;
                        while (!aborted.get() && (id = (CollectionId)collectionsToCopy.poll()) != null) {
                            if (!sourceContext.isRunning()) {
                                throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                            }
                            if (collectionsToCopy.isEmpty()) {
                                snapshotContext.lastCollection = true;
                            }
                            this.createDataEventsForCollection(sourceContext, snapshotContext, snapshotReceiver, replicaSet, id, primaryClient);
                        }
                    }
                    catch (InterruptedException e) {
                        aborted.set(true);
                    }
                    finally {
                        latch.countDown();
                    }
                });
            }
            try {
                latch.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                aborted.set(true);
            }
            copyThreads.shutdown();
        } else {
            Iterator<CollectionId> it = collections.iterator();
            while (it.hasNext()) {
                CollectionId collectionId = it.next();
                if (!sourceContext.isRunning()) {
                    throw new InterruptedException("Interrupted while snapshotting replica set " + replicaSet.replicaSetName());
                }
                if (!it.hasNext()) {
                    snapshotContext.lastCollection = true;
                }
                this.createDataEventsForCollection(sourceContext, snapshotContext, snapshotReceiver, replicaSet, collectionId, primaryClient);
            }
        }
        offsetContext.stopReplicaSetSnapshot(replicaSet.replicaSetName());
    }

    private void createDataEventsForCollection(ChangeEventSource.ChangeEventSourceContext sourceContext, MongoDbSnapshotContext snapshotContext, EventDispatcher.SnapshotReceiver snapshotReceiver, ReplicaSet replicaSet, CollectionId collectionId, ConnectionContext.MongoPrimary primaryClient) throws InterruptedException {
        long exportStart = this.clock.currentTimeInMillis();
        LOGGER.info("\t Exporting data for collection '{}'", (Object)collectionId);
        primaryClient.executeBlocking("sync '" + collectionId + "'", (BlockingConsumer<MongoClient>)((BlockingConsumer)primary -> {
            MongoDatabase database = primary.getDatabase(collectionId.dbName());
            MongoCollection collection = database.getCollection(collectionId.name());
            int batchSize = this.taskContext.getConnectorConfig().getSnapshotFetchSize();
            long docs = 0L;
            try (MongoCursor cursor = collection.find().batchSize(batchSize).iterator();){
                snapshotContext.lastRecordInCollection = false;
                if (cursor.hasNext()) {
                    while (cursor.hasNext()) {
                        if (!sourceContext.isRunning()) {
                            throw new InterruptedException("Interrupted while snapshotting collection " + collectionId.name());
                        }
                        Document document = (Document)cursor.next();
                        ++docs;
                        boolean bl = snapshotContext.lastRecordInCollection = !cursor.hasNext();
                        if (snapshotContext.lastCollection && snapshotContext.lastRecordInCollection) {
                            snapshotContext.offset.markLastSnapshotRecord();
                        }
                        this.dispatcher.dispatchSnapshotEvent((DataCollectionId)collectionId, this.getChangeRecordEmitter(snapshotContext, collectionId, document, replicaSet), snapshotReceiver);
                    }
                } else if (snapshotContext.lastCollection) {
                    snapshotContext.offset.markLastSnapshotRecord();
                }
                LOGGER.info("\t Finished snapshotting {} records for collection '{}'; total duration '{}'", new Object[]{docs, collectionId, Strings.duration((long)(this.clock.currentTimeInMillis() - exportStart))});
                this.snapshotProgressListener.dataCollectionSnapshotCompleted((DataCollectionId)collectionId, docs);
            }
        }));
    }

    protected ChangeRecordEmitter getChangeRecordEmitter(AbstractSnapshotChangeEventSource.SnapshotContext snapshotContext, CollectionId collectionId, Document document, ReplicaSet replicaSet) {
        MongoDbOffsetContext offsetContext = (MongoDbOffsetContext)snapshotContext.offset;
        ReplicaSetOffsetContext replicaSetOffsetContext = offsetContext.getReplicaSetOffsetContext(replicaSet);
        replicaSetOffsetContext.readEvent(collectionId, this.getClock().currentTime());
        return new MongoDbChangeRecordEmitter(replicaSetOffsetContext, this.getClock(), document);
    }

    protected Clock getClock() {
        return this.clock;
    }

    private static class MongoDbSnapshotContext
    extends AbstractSnapshotChangeEventSource.SnapshotContext {
        public boolean lastCollection;
        public boolean lastRecordInCollection;

        private MongoDbSnapshotContext() {
        }
    }

    public static class MongoDbSnapshottingTask
    extends AbstractSnapshotChangeEventSource.SnapshottingTask {
        private final List<ReplicaSet> replicaSetsToSnapshot;

        public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot) {
            super(false, !replicaSetsToSnapshot.isEmpty());
            this.replicaSetsToSnapshot = replicaSetsToSnapshot;
        }

        public List<ReplicaSet> getReplicaSetsToSnapshot() {
            return Collections.unmodifiableList(this.replicaSetsToSnapshot);
        }

        public boolean shouldSkipSnapshot() {
            return !this.snapshotData();
        }

        public String toString() {
            return "SnapshottingTask [replicaSetsToSnapshot=" + this.replicaSetsToSnapshot + "]";
        }
    }
}

