/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.DebeziumException;
import io.debezium.connector.mongodb.CollectionId;
import io.debezium.connector.mongodb.DisconnectEvent;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbIncrementalSnapshotChangeEventSource;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.MongoDbPartition;
import io.debezium.connector.mongodb.MongoDbSchema;
import io.debezium.connector.mongodb.MongoDbSnapshotChangeEventSource;
import io.debezium.connector.mongodb.MongoDbStreamingChangeEventSource;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSets;
import io.debezium.connector.mongodb.connection.ConnectionContext;
import io.debezium.connector.mongodb.connection.MongoDbConnection;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.pipeline.spi.Partition;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbChangeEventSourceFactory
implements ChangeEventSourceFactory<MongoDbPartition, MongoDbOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnection.class);
    private final MongoDbConnectorConfig configuration;
    private final ErrorHandler errorHandler;
    private final EventDispatcher<MongoDbPartition, CollectionId> dispatcher;
    private final Clock clock;
    private final ReplicaSets replicaSets;
    private final MongoDbTaskContext taskContext;
    private final MongoDbConnection.ChangeEventSourceConnectionFactory connections;
    private final MongoDbSchema schema;

    public MongoDbChangeEventSourceFactory(MongoDbConnectorConfig configuration, ErrorHandler errorHandler, EventDispatcher<MongoDbPartition, CollectionId> dispatcher, Clock clock, ReplicaSets replicaSets, MongoDbTaskContext taskContext, MongoDbSchema schema) {
        this.configuration = configuration;
        this.errorHandler = errorHandler;
        this.dispatcher = dispatcher;
        this.clock = clock;
        this.replicaSets = replicaSets;
        this.taskContext = taskContext;
        this.connections = this.getMongoDbConnectionFactory(taskContext.getConnectionContext());
        this.schema = schema;
    }

    public SnapshotChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService) {
        return new MongoDbSnapshotChangeEventSource(this.configuration, this.taskContext, this.connections, this.replicaSets, this.dispatcher, this.clock, snapshotProgressListener, this.errorHandler, notificationService);
    }

    public StreamingChangeEventSource<MongoDbPartition, MongoDbOffsetContext> getStreamingChangeEventSource() {
        return new MongoDbStreamingChangeEventSource(this.configuration, this.taskContext, this.connections, this.replicaSets, this.dispatcher, this.errorHandler, this.clock);
    }

    public Optional<IncrementalSnapshotChangeEventSource<MongoDbPartition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(MongoDbOffsetContext offsetContext, SnapshotProgressListener<MongoDbPartition> snapshotProgressListener, DataChangeEventListener<MongoDbPartition> dataChangeEventListener, NotificationService<MongoDbPartition, MongoDbOffsetContext> notificationService) {
        if (this.replicaSets.size() > 1) {
            LOGGER.info("Only ReplicaSet deployments and Sharded Cluster with connection.mode=sharded are supported by incremental snapshot");
            return Optional.empty();
        }
        MongoDbIncrementalSnapshotChangeEventSource incrementalSnapshotChangeEventSource = new MongoDbIncrementalSnapshotChangeEventSource(this.configuration, this.connections, this.replicaSets, this.dispatcher, this.schema, this.clock, snapshotProgressListener, dataChangeEventListener, notificationService);
        return Optional.of(incrementalSnapshotChangeEventSource);
    }

    public MongoDbConnection.ChangeEventSourceConnectionFactory getMongoDbConnectionFactory(ConnectionContext connectionContext) {
        return (replicaSet, partition) -> connectionContext.connect(replicaSet, this.taskContext.filters(), this.connectionErrorHandler(partition));
    }

    private MongoDbConnection.ErrorHandler connectionErrorHandler(MongoDbPartition partition) {
        return (desc, error) -> {
            if (error.getMessage() == null || !error.getMessage().startsWith("Command failed with error 13")) {
                this.dispatcher.dispatchConnectorEvent((Partition)partition, (ConnectorEvent)new DisconnectEvent());
            }
            LOGGER.error("Error while attempting to {}: {}", new Object[]{desc, error.getMessage(), error});
            throw new DebeziumException("Error while attempting to " + desc, error);
        };
    }
}

