/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbIncrementalSnapshotContext;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.MongoUtil;
import io.debezium.connector.mongodb.ReplicaSetOffsetContext;
import io.debezium.connector.mongodb.ReplicaSetPartition;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.SourceInfo;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.connector.mongodb.connection.ReplicaSet;
import io.debezium.connector.mongodb.events.BufferingChangeStreamCursor;
import io.debezium.connector.mongodb.events.SplitEventHandler;
import io.debezium.connector.mongodb.metrics.MongoDbStreamingChangeEventSourceMetrics;
import io.debezium.connector.mongodb.recordemitter.MongoDbChangeRecordEmitter;
import io.debezium.function.BlockingConsumer;
import io.debezium.function.BlockingRunnable;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Threads;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.bson.BsonDocument;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbStreamingChangeEventSource
implements StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbStreamingChangeEventSource.class);
    private final MongoDbConnectorConfig connectorConfig;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final ConnectionContext connectionContext;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;
    private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
    private final MongoDbStreamingChangeEventSourceMetrics streamingMetrics;
    private MongoDbOffsetContext effectiveOffset;

    public MongoDbStreamingChangeEventSource(MongoDbConnectorConfig connectorConfig, MongoDbTaskContext taskContext, MongoDbConnection.ChangeEventSourceConnectionFactory connections, ReplicaSets replicaSets, EventDispatcher<MongoDbPartition, CollectionId> dispatcher, ErrorHandler errorHandler, Clock clock, MongoDbStreamingChangeEventSourceMetrics streamingMetrics) {
        this.connectorConfig = connectorConfig;
        this.connectionContext = taskContext.getConnectionContext();
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = taskContext;
        this.connections = connections;
        this.streamingMetrics = streamingMetrics;
    }

    public void init(MongoDbOffsetContext offsetContext) {
        this.effectiveOffset = offsetContext == null ? this.emptyOffsets(this.connectorConfig) : offsetContext;
    }

    public void execute(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, MongoDbOffsetContext offsetContext) throws InterruptedException {
        List<ReplicaSet> validReplicaSets = this.replicaSets.all();
        if (validReplicaSets.size() == 1) {
            this.streamChangesForReplicaSet(context, partition, validReplicaSets.get(0));
        } else if (validReplicaSets.size() > 1) {
            this.streamChangesForReplicaSets(context, partition, validReplicaSets);
        }
    }

    public MongoDbOffsetContext getOffsetContext() {
        return this.effectiveOffset;
    }

    private void streamChangesForReplicaSet(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, ReplicaSet replicaSet) {
        try (MongoDbConnection mongo = this.connections.get(replicaSet, partition);){
            mongo.execute("read from change stream on '" + replicaSet + "'", (BlockingConsumer<MongoClient>)((BlockingConsumer)client -> this.readChangeStream((MongoClient)client, replicaSet, context)));
        }
        catch (Throwable t) {
            LOGGER.error("Streaming for replica set {} failed", (Object)replicaSet.replicaSetName(), (Object)t);
            this.errorHandler.setProducerThrowable(t);
        }
    }

    private void streamChangesForReplicaSets(ChangeEventSource.ChangeEventSourceContext context, MongoDbPartition partition, List<ReplicaSet> replicaSets) {
        int threads = replicaSets.size();
        ExecutorService executor = Threads.newFixedThreadPool(MongoDbConnector.class, (String)this.taskContext.serverName(), (String)"replicator-streaming", (int)threads);
        CountDownLatch latch = new CountDownLatch(threads);
        LOGGER.info("Starting {} thread(s) to stream changes for replica sets: {}", (Object)threads, replicaSets);
        replicaSets.forEach(replicaSet -> executor.submit(() -> {
            try {
                this.streamChangesForReplicaSet(context, partition, (ReplicaSet)replicaSet);
            }
            finally {
                latch.countDown();
            }
        }));
        try {
            latch.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        executor.shutdown();
    }

    private void readChangeStream(MongoClient client, ReplicaSet replicaSet, ChangeEventSource.ChangeEventSourceContext context) {
        LOGGER.info("Reading change stream for '{}'", (Object)replicaSet);
        ReplicaSetPartition rsPartition = this.effectiveOffset.getReplicaSetPartition(replicaSet);
        ReplicaSetOffsetContext rsOffsetContext = this.effectiveOffset.getReplicaSetOffsetContext(replicaSet);
        SplitEventHandler splitHandler = new SplitEventHandler();
        ChangeStreamIterable<BsonDocument> rsChangeStream = this.initChangeStream(client, rsOffsetContext);
        try (BufferingChangeStreamCursor<BsonDocument> cursor = BufferingChangeStreamCursor.fromIterable(rsChangeStream, this.taskContext, this.streamingMetrics, this.clock).start();){
            while (context.isRunning()) {
                this.waitWhenStreamingPaused(context);
                Object resumableEvent = cursor.tryNext();
                if (resumableEvent == null) continue;
                StreamStatus result = ((BufferingChangeStreamCursor.ResumableChangeStreamEvent)resumableEvent).document.map(doc -> this.processChangeStreamDocument((ChangeStreamDocument<BsonDocument>)doc, splitHandler, replicaSet, rsPartition, rsOffsetContext)).orElseGet(() -> this.lambda$readChangeStream$5((BufferingChangeStreamCursor.ResumableChangeStreamEvent)resumableEvent, rsPartition, rsOffsetContext));
                if (result != StreamStatus.ERROR) continue;
                return;
            }
        }
    }

    private void waitWhenStreamingPaused(ChangeEventSource.ChangeEventSourceContext context) {
        if (context.isPaused()) {
            this.errorHandled(() -> {
                LOGGER.info("Streaming will now pause");
                context.streamingPaused();
                context.waitSnapshotCompletion();
                LOGGER.info("Streaming resumed");
            });
        }
    }

    private StreamStatus processChangeStreamDocument(ChangeStreamDocument<BsonDocument> document, SplitEventHandler<BsonDocument> splitHandler, ReplicaSet replicaSet, ReplicaSetPartition rsPartition, ReplicaSetOffsetContext rsOffsetContext) {
        LOGGER.trace("Arrived Change Stream event: {}", document);
        return splitHandler.handle(document).map(event -> this.errorHandled(() -> this.dispatchChangeEvent((ChangeStreamDocument<BsonDocument>)event, replicaSet, rsPartition, rsOffsetContext))).orElse(StreamStatus.NEXT);
    }

    private void dispatchChangeEvent(ChangeStreamDocument<BsonDocument> event, ReplicaSet replicaSet, ReplicaSetPartition rsPartition, ReplicaSetOffsetContext rsOffsetContext) throws InterruptedException {
        CollectionId collectionId = new CollectionId(replicaSet.replicaSetName(), event.getNamespace().getDatabaseName(), event.getNamespace().getCollectionName());
        MongoDbChangeRecordEmitter emitter = new MongoDbChangeRecordEmitter(rsPartition, (OffsetContext)rsOffsetContext, this.clock, event, this.connectorConfig);
        rsOffsetContext.changeStreamEvent(event);
        this.dispatcher.dispatchDataChangeEvent((Partition)rsPartition, (DataCollectionId)collectionId, (ChangeRecordEmitter)emitter);
    }

    private void dispatchHeartbeatEvent(BufferingChangeStreamCursor.ResumableChangeStreamEvent<BsonDocument> event, ReplicaSetPartition rsPartition, ReplicaSetOffsetContext rsOffsetContext) throws InterruptedException {
        LOGGER.trace("No Change Stream event arrived");
        rsOffsetContext.noEvent(event);
        this.dispatcher.dispatchHeartbeatEvent((Partition)rsPartition, (OffsetContext)rsOffsetContext);
    }

    private StreamStatus errorHandled(BlockingRunnable action) {
        try {
            action.run();
            return StreamStatus.DISPATCHED;
        }
        catch (InterruptedException e) {
            LOGGER.info("Replicator thread is interrupted");
            Thread.currentThread().interrupt();
            return StreamStatus.ERROR;
        }
        catch (Exception e) {
            this.errorHandler.setProducerThrowable((Throwable)e);
            return StreamStatus.ERROR;
        }
    }

    protected ChangeStreamIterable<BsonDocument> initChangeStream(MongoClient client, ReplicaSetOffsetContext offsetContext) {
        ChangeStreamIterable<BsonDocument> stream = MongoUtil.openChangeStream(client, this.taskContext);
        if (this.taskContext.getCaptureMode().isFullUpdate()) {
            stream.fullDocument(FullDocument.UPDATE_LOOKUP);
        }
        if (this.taskContext.getCaptureMode().isIncludePreImage()) {
            stream.fullDocumentBeforeChange(FullDocumentBeforeChange.WHEN_AVAILABLE);
        }
        if (offsetContext.lastResumeToken() != null) {
            LOGGER.info("Resuming streaming from token '{}'", (Object)offsetContext.lastResumeToken());
            BsonDocument doc = new BsonDocument();
            doc.put("_data", (BsonValue)new BsonString(offsetContext.lastResumeToken()));
            stream.resumeAfter(doc);
        } else if (offsetContext.lastTimestamp() != null) {
            LOGGER.info("Resuming streaming from operation time '{}'", (Object)offsetContext.lastTimestamp());
            stream.startAtOperationTime(offsetContext.lastTimestamp());
        }
        if (this.connectorConfig.getCursorMaxAwaitTime() > 0) {
            stream.maxAwaitTime((long)this.connectorConfig.getCursorMaxAwaitTime(), TimeUnit.MILLISECONDS);
        }
        return stream;
    }

    protected MongoDbOffsetContext emptyOffsets(MongoDbConnectorConfig connectorConfig) {
        LOGGER.info("Initializing empty Offset context");
        return new MongoDbOffsetContext(new SourceInfo(connectorConfig), new TransactionContext(), new MongoDbIncrementalSnapshotContext<CollectionId>(false));
    }

    private /* synthetic */ StreamStatus lambda$readChangeStream$5(BufferingChangeStreamCursor.ResumableChangeStreamEvent resumableEvent, ReplicaSetPartition rsPartition, ReplicaSetOffsetContext rsOffsetContext) {
        return this.errorHandled(() -> this.dispatchHeartbeatEvent(resumableEvent, rsPartition, rsOffsetContext));
    }

    protected static enum StreamStatus {
        DISPATCHED,
        NEXT,
        ERROR;

    }
}

