/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.DBRef;
import com.mongodb.client.ClientSession;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.InsertOneOptions;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.mongodb.ConnectionContext;
import io.debezium.connector.mongodb.JsonSerialization;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.ReplicaSet;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.util.Collect;
import io.debezium.util.IoUtil;
import io.debezium.util.Testing;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MongoDbConnectorIT
extends AbstractConnectorTest {
    private Configuration config;
    private MongoDbTaskContext context;

    @Before
    public void beforeEach() {
        Testing.Debug.disable();
        Testing.Print.disable();
        this.stopConnector();
        this.initializeConnectorTestFramework();
    }

    @After
    public void afterEach() {
        try {
            this.stopConnector();
        }
        finally {
            if (this.context != null) {
                this.context.getConnectionContext().shutdown();
            }
        }
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() {
        this.config = ((Configuration.Builder)Configuration.create().with(MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS, "true")).build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        this.start(MongoDbConnector.class, this.config, (success, msg, error) -> {
            Assertions.assertThat((boolean)success).isFalse();
            Assertions.assertThat((Throwable)error).isNotNull();
        });
        this.assertConnectorNotRunning();
    }

    @Test
    public void shouldFailToValidateInvalidConfiguration() {
        Configuration config = Configuration.create().build();
        MongoDbConnector connector = new MongoDbConnector();
        Config result = connector.validate(config.asMap());
        this.assertConfigurationErrors(result, MongoDbConnectorConfig.HOSTS, 1);
        this.assertConfigurationErrors(result, MongoDbConnectorConfig.LOGICAL_NAME, 1);
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.USER});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SSL_ENABLED});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES});
        this.assertNoConfigurationErrors(result, new Field[]{CommonConnectorConfig.TOMBSTONES_ON_DELETE});
    }

    @Test
    public void shouldValidateAcceptableConfiguration() {
        this.config = TestHelper.getConfiguration();
        this.context = new MongoDbTaskContext(this.config);
        this.storeDocuments("dbval", "validationColl1", "simple_objects.json");
        this.storeDocuments("dbval2", "validationColl2", "restaurants1.json");
        MongoDbConnector connector = new MongoDbConnector();
        Config result = connector.validate(this.config.asMap());
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.HOSTS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.LOGICAL_NAME});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.USER});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.PASSWORD});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.AUTO_DISCOVER_MEMBERS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.DATABASE_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_WHITELIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_BLACKLIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.COLLECTION_EXCLUDE_LIST});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_QUEUE_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_BATCH_SIZE});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.POLL_INTERVAL_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_INITIAL_DELAY_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.CONNECT_BACKOFF_MAX_DELAY_MS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SSL_ENABLED});
        this.assertNoConfigurationErrors(result, new Field[]{MongoDbConnectorConfig.SSL_ALLOW_INVALID_HOSTNAMES});
        this.assertNoConfigurationErrors(result, new Field[]{CommonConnectorConfig.TOMBSTONES_ON_DELETE});
    }

    @Test
    public void shouldConsumeAllEventsFromDatabase() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "simpletons", "simple_objects.json");
        this.storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(2);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        Assertions.assertThat((int)records2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat((int)records2.topics().size()).isEqualTo(1);
        records2.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyNotFromTransaction((SourceRecord)record);
        });
        this.stopConnector();
        this.storeDocuments("dbit", "restaurants", "restaurants3.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records3 = this.consumeRecordsByTopic(5);
        Assertions.assertThat((int)records3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5);
        Assertions.assertThat((int)records3.topics().size()).isEqualTo(1);
        records3.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
        });
        this.storeDocuments("dbit", "restaurants", "restaurants4.json");
        AbstractConnectorTest.SourceRecords records4 = this.consumeRecordsByTopic(8);
        Assertions.assertThat((int)records4.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(8);
        Assertions.assertThat((int)records4.topics().size()).isEqualTo(1);
        records4.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
        });
        AtomicReference id = new AtomicReference();
        this.primary().execute("create", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            coll.drop();
            Document doc = Document.parse((String)"{\"a\": 1, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
            doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
            id.set(doc.getObjectId((Object)"_id").toString());
            Testing.debug((Object)("Document ID: " + (String)id.get()));
        });
        this.primary().execute("update", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            Document doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
            Document filter = Document.parse((String)"{\"a\": 1}");
            Document operation = Document.parse((String)"{ \"$set\": { \"b\": 10 } }");
            coll.updateOne((Bson)filter, (Bson)operation);
            doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
        });
        AbstractConnectorTest.SourceRecords insertAndUpdate = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)insertAndUpdate.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2);
        Assertions.assertThat((int)insertAndUpdate.topics().size()).isEqualTo(1);
        records4.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
        });
        SourceRecord insertRecord = (SourceRecord)insertAndUpdate.allRecordsInOrder().get(0);
        SourceRecord updateRecord = (SourceRecord)insertAndUpdate.allRecordsInOrder().get(1);
        Testing.debug((Object)("Insert event: " + insertRecord));
        Testing.debug((Object)("Update event: " + updateRecord));
        Struct insertKey = (Struct)insertRecord.key();
        Struct updateKey = (Struct)updateRecord.key();
        String insertId = this.toObjectId(insertKey.getString("id")).toString();
        String updateId = this.toObjectId(updateKey.getString("id")).toString();
        Assertions.assertThat((String)insertId).isEqualTo(id.get());
        Assertions.assertThat((String)updateId).isEqualTo(id.get());
        this.primary().execute("delete", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            Document filter = Document.parse((String)"{\"a\": 1}");
            coll.deleteOne((Bson)filter);
        });
        AbstractConnectorTest.SourceRecords delete = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)delete.recordsForTopic("mongo.dbit.arbitrary").size()).isEqualTo(2);
        Assertions.assertThat((int)delete.topics().size()).isEqualTo(1);
        SourceRecord deleteRecord = (SourceRecord)delete.allRecordsInOrder().get(0);
        this.validate(deleteRecord);
        this.verifyNotFromInitialSync(deleteRecord);
        this.verifyDeleteOperation(deleteRecord);
        SourceRecord tombStoneRecord = (SourceRecord)delete.allRecordsInOrder().get(1);
        this.validate(tombStoneRecord);
        Testing.debug((Object)("Delete event: " + deleteRecord));
        Testing.debug((Object)("Tombstone event: " + tombStoneRecord));
        Struct deleteKey = (Struct)deleteRecord.key();
        String deleteId = this.toObjectId(deleteKey.getString("id")).toString();
        Assertions.assertThat((String)deleteId).isEqualTo(id.get());
    }

    @Test
    @FixFor(value={"DBZ-1831"})
    public void shouldConsumeAllEventsFromDatabaseWithSkippedOperations() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.SKIPPED_OPERATIONS, "u")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        AtomicReference id = new AtomicReference();
        this.primary().execute("create", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            coll.drop();
            Document doc = Document.parse((String)"{\"a\": 1, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
            doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
            id.set(doc.getObjectId((Object)"_id").toString());
            Testing.debug((Object)("Document ID: " + (String)id.get()));
        });
        AbstractConnectorTest.SourceRecords insert = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)insert.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
        this.primary().execute("update", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            Document doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
            Document filter = Document.parse((String)"{\"a\": 1}");
            Document operation = Document.parse((String)"{ \"$set\": { \"b\": 10 } }");
            coll.updateOne((Bson)filter, (Bson)operation);
            doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
        });
        this.primary().execute("delete", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("arbitrary");
            Document doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
            Document filter = Document.parse((String)"{\"a\": 1}");
            coll.deleteOne((Bson)filter);
            doc = (Document)coll.find().first();
            Testing.debug((Object)("Document: " + doc));
        });
        AbstractConnectorTest.SourceRecords delete = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)delete.recordsForTopic("mongo.dbit.arbitrary")).hasSize(1);
        SourceRecord deleteRecord = (SourceRecord)delete.allRecordsInOrder().get(0);
        this.validate(deleteRecord);
        this.verifyDeleteOperation(deleteRecord);
    }

    @Test
    @FixFor(value={"DBZ-1168"})
    public void shouldConsumeAllEventsFromDatabaseWithCustomAuthSource() throws InterruptedException, IOException {
        String authDbName = "authdb";
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.primary().execute("Create auth database", client -> {
            MongoDatabase db = client.getDatabase("authdb");
            try {
                db.runCommand((Bson)BsonDocument.parse((String)"{dropUser: \"dbz\"}"));
            }
            catch (Exception e) {
                this.logger.info("Expected error while dropping user", (Throwable)e);
            }
            db.runCommand((Bson)BsonDocument.parse((String)"{createUser: \"dbz\", pwd: \"pass\", roles: [{role: \"readAnyDatabase\", db: \"admin\"}]}"));
        });
        this.storeDocuments("dbit", "simpletons", "simple_objects.json");
        this.storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.USER, "dbz")).with(MongoDbConnectorConfig.PASSWORD, "pass")).with(MongoDbConnectorConfig.AUTH_SOURCE, "authdb")).with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(2);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        Assertions.assertThat((int)records2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat((int)records2.topics().size()).isEqualTo(1);
        records2.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyNotFromTransaction((SourceRecord)record);
        });
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1767"})
    public void shouldSupportDbRef() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "spec", "spec_objects.json");
        this.context = new MongoDbTaskContext(this.config);
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.spec").size()).isEqualTo(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.primary().execute("insert", client -> client.getDatabase("dbit").getCollection("spec").insertOne((Object)Document.parse((String)"{ '_id' : 2, 'data' : { '$ref' : 'a2', '$id' : 4, '$db' : 'b2' } }")));
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records2.recordsForTopic("mongo.dbit.spec").size()).isEqualTo(1);
        Assertions.assertThat((int)records2.topics().size()).isEqualTo(1);
        records2.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyNotFromTransaction((SourceRecord)record);
        });
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-865 and DBZ-1242"})
    public void shouldConsumeEventsFromCollectionWithReplacedTopicName() throws InterruptedException, IOException {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.primary().execute("create", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("dbz865_my@collection");
            coll.drop();
            Document doc = Document.parse((String)"{\"a\": 1, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
        });
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((List)records.recordsForTopic("mongo.dbit.dbz865_my_collection")).hasSize(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse());
    }

    @Test
    @FixFor(value={"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingCollectionFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.dbz865.my_products")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.primary().execute("create", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("dbz865_my@collection");
            coll.drop();
            Document doc = Document.parse((String)"{\"a\": 1, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
        });
        this.start(MongoDbConnector.class, this.config);
        this.consumeRecordsByTopic(12);
        this.stopConnector(value -> Assertions.assertThat((boolean)logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue());
    }

    protected void verifyFromInitialSync(SourceRecord record, AtomicBoolean foundLast) {
        if (record.sourceOffset().containsKey("initsync")) {
            Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isTrue();
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"true");
        } else {
            Assertions.assertThat((boolean)foundLast.getAndSet(true)).isFalse();
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"last");
        }
        this.verifyNotFromTransaction(record);
    }

    @Test
    @FixFor(value={"DBZ-1215"})
    public void shouldConsumeTransaction() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        if (!TestHelper.transactionsSupported(this.primary(), "dbit")) {
            this.logger.info("Test not executed, transactions not supported in the server");
            return;
        }
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "simpletons", "simple_objects.json");
        this.storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(2);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.storeDocumentsInTx("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        Assertions.assertThat((int)records2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(4);
        Assertions.assertThat((int)records2.topics().size()).isEqualTo(1);
        AtomicLong txOrder = new AtomicLong(0L);
        records2.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyFromTransaction((SourceRecord)record, txOrder.incrementAndGet());
        });
        this.stopConnector();
        this.storeDocumentsInTx("dbit", "restaurants", "restaurants3.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records3 = this.consumeRecordsByTopic(5);
        Assertions.assertThat((int)records3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(5);
        Assertions.assertThat((int)records3.topics().size()).isEqualTo(1);
        txOrder.set(0L);
        records3.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyFromTransaction((SourceRecord)record, txOrder.incrementAndGet());
        });
    }

    @Test
    @FixFor(value={"DBZ-1215"})
    public void shouldResumeTransactionInMiddle() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        if (!TestHelper.transactionsSupported(this.primary(), "dbit")) {
            this.logger.info("Test not executed, transactions not supported in the server");
            return;
        }
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "simpletons", "simple_objects.json");
        this.storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.start(MongoDbConnector.class, this.config, record -> {
            Struct struct = (Struct)record.value();
            Long txOrder = struct.getStruct("source").getInt64("tord");
            return txOrder != null && txOrder.equals(3L);
        });
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(6);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(6);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(2);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        this.storeDocumentsInTx("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records2.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(2);
        Assertions.assertThat((int)records2.topics().size()).isEqualTo(1);
        AtomicLong txOrder = new AtomicLong(0L);
        records2.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyFromTransaction((SourceRecord)record, txOrder.incrementAndGet());
        });
        this.stopConnector();
        this.storeDocumentsInTx("dbit", "restaurants", "restaurants3.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records3 = this.consumeRecordsByTopic(7);
        Assertions.assertThat((int)records3.recordsForTopic("mongo.dbit.restaurants").size()).isEqualTo(7);
        Assertions.assertThat((int)records3.topics().size()).isEqualTo(1);
        List expectedTxOrd = Collect.arrayListOf((Object)3L, (Object[])new Long[]{4L, 1L, 2L, 3L, 4L, 5L});
        records3.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyCreateOperation((SourceRecord)record);
            this.verifyFromTransaction((SourceRecord)record, (Long)expectedTxOrd.remove(0));
        });
    }

    @Test
    @FixFor(value={"DBZ-2116"})
    public void shouldSnapshotDocumentContainingFieldNamedOp() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "fieldnamedop", "fieldnamedop.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.fieldnamedop").size()).isEqualTo(2);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, foundLast);
            this.verifyReadOperation((SourceRecord)record);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
        SourceRecord record2 = (SourceRecord)records.recordsForTopic("mongo.dbit.fieldnamedop").get(0);
        Assertions.assertThat((Object)((Struct)record2.value()).get("op")).isEqualTo((Object)"r");
        Document after = Document.parse((String)((String)((Struct)record2.value()).get("after")));
        Assertions.assertThat((Object)after.get((Object)"op")).isEqualTo((Object)"foo");
        record2 = (SourceRecord)records.recordsForTopic("mongo.dbit.fieldnamedop").get(1);
        Assertions.assertThat((Object)((Struct)record2.value()).get("op")).isEqualTo((Object)"r");
        after = Document.parse((String)((String)((Struct)record2.value()).get("after")));
        Assertions.assertThat((Object)after.get((Object)"op")).isEqualTo((Object)"bar");
    }

    @Test
    @FixFor(value={"DBZ-2496"})
    public void shouldFilterItemsInCollectionWhileTakingSnapshot() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION, "dbit.simpletons,dbit.restaurants1,dbit.restaurants4")).with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.simpletons", "{ \"_id\": { \"$gt\": 4 } }").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.restaurants1", "{ $or: [ { cuisine: \"American \"}, { \"grades.grade\": \"Z\" } ] }").with(MongoDbConnectorConfig.SNAPSHOT_FILTER_QUERY_BY_COLLECTION + ".dbit.restaurants4", "{ cuisine: \"American \" , borough: \"Manhattan\"  }").build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "simpletons", "simple_objects.json");
        this.storeDocuments("dbit", "restaurants1", "restaurants1.json");
        this.storeDocuments("dbit", "restaurants2", "restaurants2.json");
        this.storeDocuments("dbit", "restaurants4", "restaurants4.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(15);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.simpletons").size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants1").size()).isEqualTo(3);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants2").size()).isEqualTo(4);
        Assertions.assertThat((int)records.recordsForTopic("mongo.dbit.restaurants4").size()).isEqualTo(4);
        this.assertNoRecordsToConsume();
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-2456"})
    public void shouldSelectivelySnapshot() throws InterruptedException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "[A-z].*dbit.restaurants1")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "restaurants1", "restaurants1.json");
        this.storeDocuments("dbit", "restaurants2", "restaurants2.json");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(6);
        List restaurant1 = records.recordsForTopic("mongo.dbit.restaurants1");
        List restaurant2 = records.recordsForTopic("mongo.dbit.restaurants2");
        Assertions.assertThat((int)restaurant1.size()).isEqualTo(6);
        Assertions.assertThat((List)restaurant2).isNull();
        Instant timestamp = Instant.now();
        ObjectId objId = new ObjectId();
        Document obj = Document.parse((String)"{\"name\": \"Brunos On The Boulevard\", \"restaurant_id\": \"40356151\"}");
        this.insertDocuments("dbit", "restaurants2", obj);
        records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        this.stopConnector();
    }

    protected void verifyNotFromInitialSync(SourceRecord record) {
        Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isFalse();
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isNull();
    }

    protected void verifyFromTransaction(SourceRecord record, long order) {
        Assertions.assertThat((boolean)record.sourceOffset().containsKey("tord")).isTrue();
        Struct value = (Struct)record.value();
        Assertions.assertThat((Long)value.getStruct("source").getInt64("tord")).isEqualTo(order);
    }

    protected void verifyNotFromTransaction(SourceRecord record) {
        Assertions.assertThat((boolean)record.sourceOffset().containsKey("tord")).isFalse();
    }

    protected void verifyCreateOperation(SourceRecord record) {
        this.verifyOperation(record, Envelope.Operation.CREATE);
    }

    protected void verifyReadOperation(SourceRecord record) {
        this.verifyOperation(record, Envelope.Operation.READ);
    }

    protected void verifyUpdateOperation(SourceRecord record) {
        this.verifyOperation(record, Envelope.Operation.UPDATE);
    }

    protected void verifyDeleteOperation(SourceRecord record) {
        this.verifyOperation(record, Envelope.Operation.DELETE);
    }

    protected void verifyOperation(SourceRecord record, Envelope.Operation expected) {
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)expected.code());
    }

    protected ConnectionContext.MongoPrimary primary() {
        ReplicaSet replicaSet = ReplicaSet.parse((String)this.context.getConnectionContext().hosts());
        return this.context.getConnectionContext().primaryFor(replicaSet, this.context.filters(), this.connectionErrorHandler(3));
    }

    protected void storeDocuments(String dbName, String collectionName, String pathOnClasspath) {
        this.primary().execute("storing documents", mongo -> {
            Testing.debug((Object)("Storing in '" + dbName + "." + collectionName + "' documents loaded from from '" + pathOnClasspath + "'"));
            MongoDatabase db1 = mongo.getDatabase(dbName);
            MongoCollection coll = db1.getCollection(collectionName);
            coll.drop();
            this.storeDocuments((MongoCollection<Document>)coll, pathOnClasspath);
        });
    }

    protected void storeDocuments(MongoCollection<Document> collection, String pathOnClasspath) {
        InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
        this.loadTestDocuments(pathOnClasspath).forEach(doc -> {
            Assertions.assertThat((Map)doc).isNotNull();
            Assertions.assertThat((int)doc.size()).isGreaterThan(0);
            collection.insertOne(doc, insertOptions);
        });
    }

    protected void storeDocumentsInTx(String dbName, String collectionName, String pathOnClasspath) {
        this.primary().execute("storing documents", mongo -> {
            Testing.debug((Object)("Storing in '" + dbName + "." + collectionName + "' documents loaded from from '" + pathOnClasspath + "'"));
            MongoDatabase db1 = mongo.getDatabase(dbName);
            MongoCollection coll = db1.getCollection(collectionName);
            coll.drop();
            db1.createCollection(collectionName);
            ClientSession session = mongo.startSession();
            MongoDatabase admin = mongo.getDatabase("admin");
            if (admin != null) {
                int timeout = Integer.parseInt(System.getProperty("mongo.transaction.lock.request.timeout.ms", "1000"));
                Testing.debug((Object)("Setting MongoDB transaction lock request timeout as '" + timeout + "ms'"));
                admin.runCommand(session, (Bson)new Document().append("setParameter", (Object)1).append("maxTransactionLockRequestTimeoutMillis", (Object)timeout));
            }
            session.startTransaction();
            this.storeDocuments(session, (MongoCollection<Document>)coll, pathOnClasspath);
            session.commitTransaction();
        });
    }

    protected void storeDocuments(ClientSession session, MongoCollection<Document> collection, String pathOnClasspath) {
        InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
        this.loadTestDocuments(pathOnClasspath).forEach(doc -> {
            Assertions.assertThat((Map)doc).isNotNull();
            Assertions.assertThat((int)doc.size()).isGreaterThan(0);
            if (session == null) {
                collection.insertOne(doc, insertOptions);
            } else {
                collection.insertOne(session, doc, insertOptions);
            }
        });
    }

    protected List<Document> loadTestDocuments(String pathOnClasspath) {
        ArrayList<Document> results = new ArrayList<Document>();
        try (InputStream stream = Testing.Files.readResourceAsStream((String)pathOnClasspath);){
            Assertions.assertThat((Object)stream).isNotNull();
            IoUtil.readLines((InputStream)stream, line -> {
                Document doc = Document.parse((String)line);
                Assertions.assertThat((int)doc.size()).isGreaterThan(0);
                results.add(doc);
            });
        }
        catch (IOException e) {
            Assert.fail((String)("Unable to find or read file '" + pathOnClasspath + "': " + e.getMessage()));
        }
        return results;
    }

    protected BiConsumer<String, Throwable> connectionErrorHandler(int numErrorsBeforeFailing) {
        AtomicInteger attempts = new AtomicInteger();
        return (desc, error) -> {
            if (attempts.incrementAndGet() > numErrorsBeforeFailing) {
                Assert.fail((String)("Unable to connect to primary after " + numErrorsBeforeFailing + " errors trying to " + desc + ": " + error));
            }
            this.logger.error("Error while attempting to {}: {}", new Object[]{desc, error.getMessage(), error});
        };
    }

    @Test(expected=ConnectException.class)
    public void shouldUseSSL() throws InterruptedException, IOException {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.MAX_FAILED_CONNECTIONS, 0)).with(MongoDbConnectorConfig.SSL_ENABLED, true)).with(MongoDbConnectorConfig.SERVER_SELECTION_TIMEOUT_MS, 2000)).build();
        this.context = new MongoDbTaskContext(this.config);
        ConnectionContext.MongoPrimary primary = this.primary();
        primary.executeBlocking("Try SSL connection", mongo -> {
            primary.stop();
            mongo.getDatabase("dbit").listCollectionNames().first();
        });
    }

    @Test
    @FixFor(value={"DBZ-1198"})
    public void shouldEmitHeartbeatMessages() throws InterruptedException, IOException {
        Testing.Print.enable();
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.mhb")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(Heartbeat.HEARTBEAT_INTERVAL, "1")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.primary().execute("create", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll1 = db1.getCollection("mhb");
            coll1.drop();
            Document doc = Document.parse((String)"{\"a\": 1, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll1.insertOne((Object)doc, insertOptions);
            MongoCollection coll2 = db1.getCollection("nmhb");
            coll2.drop();
        });
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSize(1);
        Assertions.assertThat((List)records.recordsForTopic("mongo.dbit.mhb")).hasSize(1);
        this.primary().execute("insert-monitored", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("mhb");
            Document doc = Document.parse((String)"{\"a\": 2, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
        });
        records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((List)records.recordsForTopic("mongo.dbit.mhb")).hasSize(1);
        Map monitoredOffset = ((SourceRecord)records.recordsForTopic("mongo.dbit.mhb").get(0)).sourceOffset();
        Integer monitoredTs = (Integer)monitoredOffset.get("sec");
        Integer monitoredOrd = (Integer)monitoredOffset.get("ord");
        Assertions.assertThat((List)records.recordsForTopic("__debezium-heartbeat.mongo")).hasSize(1);
        Map hbAfterMonitoredOffset = ((SourceRecord)records.recordsForTopic("__debezium-heartbeat.mongo").get(0)).sourceOffset();
        Assertions.assertThat((Integer)monitoredTs).isEqualTo((Object)((Integer)hbAfterMonitoredOffset.get("sec")));
        Assertions.assertThat((Integer)monitoredOrd).isEqualTo((Object)((Integer)hbAfterMonitoredOffset.get("ord")));
        this.primary().execute("insert-nonmonitored", mongo -> {
            MongoDatabase db1 = mongo.getDatabase("dbit");
            MongoCollection coll = db1.getCollection("nmhb");
            Document doc = Document.parse((String)"{\"a\": 3, \"b\": 2}");
            InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            coll.insertOne((Object)doc, insertOptions);
        });
        records = this.consumeRecordsByTopic(2);
        List heartbeatRecords = records.recordsForTopic("__debezium-heartbeat.mongo");
        Assertions.assertThat((int)heartbeatRecords.size()).isGreaterThanOrEqualTo(1);
        heartbeatRecords.forEach(record -> {
            Map offset = record.sourceOffset();
            Integer ts = (Integer)offset.get("sec");
            Integer ord = (Integer)offset.get("ord");
            Assertions.assertThat((ts > monitoredTs || ts == monitoredTs && ord > monitoredOrd ? 1 : 0) != 0);
        });
        this.stopConnector();
    }

    @Test
    @FixFor(value={"DBZ-1292"})
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.storeDocuments("dbit", "restaurants", "restaurants1.json");
        this.start(MongoDbConnector.class, this.config);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(12);
        List topicRecords = records.recordsForTopic("mongo.dbit.restaurants");
        for (SourceRecord record : topicRecords) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"mongodb", (String)"mongo", (boolean)false);
        }
        this.storeDocuments("dbit", "restaurants", "restaurants2.json");
        AbstractConnectorTest.SourceRecords records2 = this.consumeRecordsByTopic(4);
        List topicRecords2 = records2.recordsForTopic("mongo.dbit.restaurants");
        for (SourceRecord record : topicRecords2) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro((SourceRecord)record, (boolean)false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro((SourceRecord)record, (String)"mongodb", (String)"mongo", (boolean)false);
        }
        this.stopConnector();
    }

    @Test
    public void shouldGenerateRecordForInsertEvent() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        Instant timestamp = Instant.now();
        ObjectId objId = new ObjectId();
        Document obj = new Document("_id", (Object)objId);
        this.insertDocuments("dbit", "c1", obj);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)deleteRecord.key();
        Struct value = (Struct)deleteRecord.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)deleteRecord.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)deleteRecord.valueSchema());
        Assertions.assertThat((String)value.getString("after")).isEqualTo((Object)obj.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
    }

    @Test
    public void shouldGenerateRecordForUpdateEvent() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        ObjectId objId = new ObjectId();
        Document obj = new Document("_id", (Object)objId);
        this.insertDocuments("dbit", "c1", obj);
        this.consumeRecordsByTopic(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document().append("name", (Object)"Sally"));
        Instant timestamp = Instant.now();
        Document filter = Document.parse((String)("{\"_id\": {\"$oid\": \"" + objId + "\"}}"));
        this.updateDocuments("dbit", "c1", filter, updateObj);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)deleteRecord.key();
        Struct value = (Struct)deleteRecord.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)deleteRecord.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Document patchObj = Document.parse((String)value.getString("patch"));
        patchObj.remove((Object)"$v");
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)deleteRecord.valueSchema());
        Assertions.assertThat((String)value.getString("after")).isNull();
        Assertions.assertThat((String)patchObj.toJson(JsonSerialization.COMPACT_JSON_SETTINGS)).isEqualTo((Object)updateObj.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.UPDATE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
    }

    @Test
    public void shouldGeneratorRecordForDeleteEvent() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        ObjectId objId = new ObjectId();
        Document obj = new Document("_id", (Object)objId);
        this.insertDocuments("dbit", "c1", obj);
        this.consumeRecordsByTopic(1);
        this.assertNoRecordsToConsume();
        Instant timestamp = Instant.now();
        this.deleteDocument("dbit", "c1", objId);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(2);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(2);
        this.assertNoRecordsToConsume();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)deleteRecord.key();
        Struct value = (Struct)deleteRecord.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)deleteRecord.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)deleteRecord.valueSchema());
        Assertions.assertThat((String)value.getString("after")).isNull();
        Assertions.assertThat((String)value.getString("patch")).isNull();
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
        SourceRecord tombstoneRecord = (SourceRecord)records.allRecordsInOrder().get(1);
        Struct tombstoneKey = (Struct)tombstoneRecord.key();
        Assertions.assertThat((Object)tombstoneKey.schema()).isSameAs((Object)tombstoneRecord.keySchema());
        Assertions.assertThat((Object)tombstoneKey.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Assertions.assertThat((Object)tombstoneRecord.value()).isNull();
        Assertions.assertThat((Object)tombstoneRecord.valueSchema()).isNull();
    }

    @Test
    @FixFor(value={"DBZ-582"})
    public void shouldGenerateRecordForDeleteEventWithoutTombstone() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.TOMBSTONES_ON_DELETE, false)).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        ObjectId objId = new ObjectId();
        Document obj = new Document("_id", (Object)objId);
        this.insertDocuments("dbit", "c1", obj);
        this.consumeRecordsByTopic(1);
        this.assertNoRecordsToConsume();
        Instant timestamp = Instant.now();
        this.deleteDocument("dbit", "c1", objId);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)record.key();
        Struct value = (Struct)record.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)record.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)record.valueSchema());
        Assertions.assertThat((String)value.getString("after")).isNull();
        Assertions.assertThat((String)value.getString("patch")).isNull();
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.DELETE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
    }

    @Test
    public void shouldGenerateRecordsWithCorrectlySerializedId() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        Document obj0 = new Document().append("_id", (Object)(Long.valueOf(Integer.MAX_VALUE) + 10L)).append("name", (Object)"Sally");
        this.insertDocuments("dbit", "c1", obj0);
        Document obj1 = new Document().append("_id", (Object)"123").append("name", (Object)"Sally");
        this.insertDocuments("dbit", "c1", obj1);
        Document obj2 = new Document().append("_id", (Object)new Document().append("company", (Object)32).append("dept", (Object)"home improvement")).append("name", (Object)"Sally");
        this.insertDocuments("dbit", "c1", obj2);
        Calendar cal = Calendar.getInstance();
        cal.set(2017, 9, 19);
        Document obj3 = new Document().append("_id", (Object)cal.getTime()).append("name", (Object)"Sally");
        this.insertDocuments("dbit", "c1", obj3);
        boolean decimal128Supported = TestHelper.decimal128Supported(this.primary(), "mongo");
        if (decimal128Supported) {
            Document obj4 = new Document().append("_id", (Object)new Decimal128(new BigDecimal("123.45678"))).append("name", (Object)"Sally");
            this.insertDocuments("dbit", "c1", obj4);
        }
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(decimal128Supported ? 5 : 4);
        List sourceRecords = records.allRecordsInOrder();
        MongoDbConnectorIT.assertSourceRecordKeyFieldIsEqualTo((SourceRecord)sourceRecords.get(0), "id", "2147483657");
        MongoDbConnectorIT.assertSourceRecordKeyFieldIsEqualTo((SourceRecord)sourceRecords.get(1), "id", "\"123\"");
        MongoDbConnectorIT.assertSourceRecordKeyFieldIsEqualTo((SourceRecord)sourceRecords.get(2), "id", "{\"company\": 32,\"dept\": \"home improvement\"}");
        MongoDbConnectorIT.assertSourceRecordKeyFieldIsEqualTo((SourceRecord)sourceRecords.get(3), "id", "{\"$date\": \"" + ZonedDateTime.ofInstant(Instant.ofEpochMilli(cal.getTimeInMillis()), ZoneId.of("Z")).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\"}");
        if (decimal128Supported) {
            MongoDbConnectorIT.assertSourceRecordKeyFieldIsEqualTo((SourceRecord)sourceRecords.get(4), "id", "{\"$numberDecimal\": \"123.45678\"}");
        }
    }

    private static void assertSourceRecordKeyFieldIsEqualTo(SourceRecord record, String fieldName, String expected) {
        Struct struct = (Struct)record.key();
        Assertions.assertThat((Object)struct.get(fieldName)).isEqualTo((Object)expected);
    }

    @Test
    public void shouldSupportDbRef2() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        ObjectId objId = new ObjectId();
        Document obj = new Document().append("_id", (Object)objId).append("name", (Object)"Sally").append("ref", (Object)new DBRef("othercollection", (Object)15));
        Instant timestamp = Instant.now();
        this.insertDocuments("dbit", "c1", obj);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        SourceRecord record = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)record.key();
        Struct value = (Struct)record.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)record.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)record.valueSchema());
        String expected = "{\"_id\": {\"$oid\": \"" + objId + "\"},\"name\": \"Sally\",\"ref\": {\"$ref\": \"othercollection\",\"$id\": 15}}";
        Assertions.assertThat((String)value.getString("after")).isEqualTo((Object)expected);
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.CREATE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
    }

    @Test
    public void shouldReplicateContent() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.INITIAL)).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbA");
        this.primary().execute("shouldCreateContactsDatabase", mongo -> {
            MongoDatabase db = mongo.getDatabase("dbA");
            MongoCollection contacts = db.getCollection("contacts");
            InsertOneOptions options = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            contacts.insertOne((Object)new Document().append("name", (Object)"Jon Snow"), options);
            Assertions.assertThat((long)db.getCollection("contacts").countDocuments()).isEqualTo(1L);
            Bson filter = Filters.eq((String)"name", (Object)"Jon Snow");
            FindIterable results = db.getCollection("contacts").find(filter);
            try (MongoCursor cursor = results.iterator();){
                Assertions.assertThat((String)((Document)cursor.tryNext()).getString((Object)"name")).isEqualTo((Object)"Jon Snow");
                Assertions.assertThat((Map)((Map)cursor.tryNext())).isNull();
            }
        });
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        Object[] expectedNames = new Object[]{"Jon Snow", "Sally Hamm"};
        this.primary().execute("shouldAddMoreRecordsToContacts", mongo -> {
            MongoDatabase db = mongo.getDatabase("dbA");
            MongoCollection contacts = db.getCollection("contacts");
            InsertOneOptions options = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            contacts.insertOne((Object)new Document().append("name", (Object)"Sally Hamm"), options);
            Assertions.assertThat((long)db.getCollection("contacts").countDocuments()).isEqualTo(2L);
            FindIterable results = db.getCollection("contacts").find();
            HashSet<String> foundNames = new HashSet<String>();
            try (MongoCursor cursor = results.iterator();){
                while (cursor.hasNext()) {
                    String name = ((Document)cursor.next()).getString((Object)"name");
                    foundNames.add(name);
                }
            }
            Assertions.assertThat(foundNames).containsOnly(expectedNames);
        });
        List records = this.consumeRecordsByTopic(2).allRecordsInOrder();
        HashSet foundNames = new HashSet();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            Struct value = (Struct)record.value();
            String after = value.getString("after");
            Document document = Document.parse((String)after);
            foundNames.add(document.getString((Object)"name"));
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            Assertions.assertThat((operation == Envelope.Operation.READ || operation == Envelope.Operation.CREATE ? 1 : 0) != 0).isTrue();
        });
        this.assertNoRecordsToConsume();
        Assertions.assertThat(foundNames).containsOnly(expectedNames);
        this.stopConnector();
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        this.assertNoRecordsToConsume();
        AtomicReference jonSnowId = new AtomicReference();
        this.primary().execute("removeJohnSnow", mongo -> {
            MongoDatabase db = mongo.getDatabase("dbA");
            MongoCollection contacts = db.getCollection("contacts");
            Bson filter = Filters.eq((String)"name", (Object)"Jon Snow");
            FindIterable results = db.getCollection("contacts").find(filter);
            try (MongoCursor cursor = results.iterator();){
                Document document = (Document)cursor.tryNext();
                Assertions.assertThat((String)document.getString((Object)"name")).isEqualTo((Object)"Jon Snow");
                Assertions.assertThat((Map)((Map)cursor.tryNext())).isNull();
                jonSnowId.set(document.getObjectId((Object)"_id"));
                Assertions.assertThat(jonSnowId.get()).isNotNull();
            }
            contacts.deleteOne(filter);
        });
        records = this.consumeRecordsByTopic(2).allRecordsInOrder();
        HashSet foundIds = new HashSet();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            Struct key = (Struct)record.key();
            ObjectId id = this.toObjectId(key.getString("id"));
            foundIds.add(id);
            if (record.value() != null) {
                Struct value = (Struct)record.value();
                Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
                Assertions.assertThat((Object)operation).isEqualTo((Object)Envelope.Operation.DELETE);
            }
        });
        this.stopConnector();
        this.initializeConnectorTestFramework();
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForSnapshotToBeCompleted((String)"mongodb", (String)"mongo");
        records = this.consumeRecordsByTopic(1).allRecordsInOrder();
        foundNames.clear();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            Struct value = (Struct)record.value();
            String after = value.getString("after");
            Document document = Document.parse((String)after);
            foundNames.add(document.getString((Object)"name"));
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            Assertions.assertThat((Object)operation).isEqualTo((Object)Envelope.Operation.READ);
        });
        Object[] allExpectedNames = new Object[]{"Sally Hamm"};
        Assertions.assertThat(foundNames).containsOnly(allExpectedNames);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        this.assertNoRecordsToConsume();
    }

    @Test
    public void shouldNotReplicateSnapshot() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbA.contacts")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.NEVER)).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbA");
        this.primary().execute("shouldCreateContactsDatabase", mongo -> {
            MongoDatabase db = mongo.getDatabase("dbA");
            MongoCollection contacts = db.getCollection("contacts");
            InsertOneOptions options = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            contacts.insertOne((Object)new Document().append("name", (Object)"Jon Snow"), options);
            Assertions.assertThat((long)db.getCollection("contacts").countDocuments()).isEqualTo(1L);
        });
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        this.primary().execute("shouldAddMoreRecordsToContacts", mongo -> {
            MongoDatabase db = mongo.getDatabase("dbA");
            MongoCollection contacts = db.getCollection("contacts");
            InsertOneOptions options = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
            contacts.insertOne((Object)new Document().append("name", (Object)"Ygritte"), options);
            Assertions.assertThat((long)db.getCollection("contacts").countDocuments()).isEqualTo(2L);
        });
        List records = this.consumeRecordsByTopic(1).allRecordsInOrder();
        HashSet foundNames = new HashSet();
        records.forEach(record -> {
            VerifyRecord.isValid((SourceRecord)record);
            Struct value = (Struct)record.value();
            String after = value.getString("after");
            Document document = Document.parse((String)after);
            foundNames.add(document.getString((Object)"name"));
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            Assertions.assertThat((Object)operation).isEqualTo((Object)Envelope.Operation.CREATE);
        });
        this.assertNoRecordsToConsume();
        Assertions.assertThat(foundNames).containsOnly(new Object[]{"Ygritte"});
    }

    @Test
    @FixFor(value={"DBZ-1880"})
    public void shouldGenerateRecordForUpdateEventUsingLegacyV1SourceInfo() throws Exception {
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration().edit().with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, "dbit.*")).with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, "v1")).with(MongoDbConnectorConfig.LOGICAL_NAME, "mongo")).build();
        this.context = new MongoDbTaskContext(this.config);
        TestHelper.cleanDatabase(this.primary(), "dbit");
        this.start(MongoDbConnector.class, this.config);
        MongoDbConnectorIT.waitForStreamingRunning((String)"mongodb", (String)"mongo");
        ObjectId objId = new ObjectId();
        Document obj = new Document("_id", (Object)objId).append("name", (Object)"John");
        this.insertDocuments("dbit", "c1", obj);
        this.consumeRecordsByTopic(1);
        this.assertNoRecordsToConsume();
        Document updateObj = new Document().append("$set", (Object)new Document().append("name", (Object)"Sally"));
        Instant timestamp = Instant.now();
        Document filter = Document.parse((String)("{\"_id\": {\"$oid\": \"" + objId + "\"}}"));
        this.updateDocuments("dbit", "c1", filter, updateObj);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        Assertions.assertThat((int)records.allRecordsInOrder().size()).isEqualTo(1);
        this.assertNoRecordsToConsume();
        SourceRecord deleteRecord = (SourceRecord)records.allRecordsInOrder().get(0);
        Struct key = (Struct)deleteRecord.key();
        Struct value = (Struct)deleteRecord.value();
        Assertions.assertThat((Object)key.schema()).isSameAs((Object)deleteRecord.keySchema());
        Assertions.assertThat((Object)key.get("id")).isEqualTo((Object)this.formatObjectId(objId));
        Document patchObj = Document.parse((String)value.getString("patch"));
        patchObj.remove((Object)"$v");
        Assertions.assertThat((Object)value.schema()).isSameAs((Object)deleteRecord.valueSchema());
        Assertions.assertThat((String)value.getString("after")).isNull();
        Assertions.assertThat((String)patchObj.toJson(JsonSerialization.COMPACT_JSON_SETTINGS)).isEqualTo((Object)updateObj.toJson(JsonSerialization.COMPACT_JSON_SETTINGS));
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)Envelope.Operation.UPDATE.code());
        Assertions.assertThat((Long)value.getInt64("ts_ms")).isGreaterThanOrEqualTo(timestamp.toEpochMilli());
    }

    private String formatObjectId(ObjectId objId) {
        return "{\"$oid\": \"" + objId + "\"}";
    }

    private void insertDocuments(String dbName, String collectionName, Document ... documents) {
        this.primary().execute("store documents", mongo -> {
            Testing.debug((Object)("Storing in '" + dbName + "." + collectionName + "' document"));
            MongoDatabase db = mongo.getDatabase(dbName);
            MongoCollection coll = db.getCollection(collectionName);
            for (Document document : documents) {
                InsertOneOptions insertOptions = new InsertOneOptions().bypassDocumentValidation(Boolean.valueOf(true));
                Assertions.assertThat((Map)document).isNotNull();
                Assertions.assertThat((int)document.size()).isGreaterThan(0);
                coll.insertOne((Object)document, insertOptions);
            }
        });
    }

    private void updateDocuments(String dbName, String collectionName, Document filter, Document document) {
        this.primary().execute("update", mongo -> {
            MongoDatabase db = mongo.getDatabase(dbName);
            MongoCollection coll = db.getCollection(collectionName);
            coll.updateOne((Bson)filter, (Bson)document);
        });
    }

    private void deleteDocument(String dbName, String collectionName, ObjectId objectId) {
        this.primary().execute("delete", mongo -> {
            MongoDatabase db = mongo.getDatabase(dbName);
            MongoCollection coll = db.getCollection(collectionName);
            Document filter = Document.parse((String)("{\"_id\": {\"$oid\": \"" + objectId + "\"}}"));
            coll.deleteOne((Bson)filter);
        });
    }

    private ObjectId toObjectId(String oid) {
        return new ObjectId(oid.substring(10, oid.length() - 2));
    }
}

