/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractMongoConnectorIT;
import io.debezium.connector.mongodb.JsonSerialization;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbTaskContext;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.snapshot.MongoDbIncrementalSnapshotChangeEventSource;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.engine.DebeziumEngine;
import io.debezium.junit.logging.LogInterceptor;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.BsonDocument;
import org.bson.Document;
import org.bson.codecs.configuration.CodecRegistry;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;

public class IncrementalSnapshotIT
extends AbstractMongoConnectorIT {
    protected static final int ROW_COUNT = 1000;
    private static final int MAXIMUM_NO_RECORDS_CONSUMES = 3;
    private static final String DATABASE_NAME = "dbA";
    private static final String COLLECTION_NAME = "c1";
    private static final String COLLECTION2_NAME = "c2";
    private static final String SIGNAL_COLLECTION_NAME = "dbA.signals";
    private static final String FULL_COLLECTION_NAME = "dbA.c1";
    private static final String FULL_COLLECTION2_NAME = "dbA.c2";
    private static final String DOCUMENT_ID = "_id";

    @Before
    public void before() {
        this.context = new MongoDbTaskContext(this.config().build());
        TestHelper.cleanDatabase(mongo, DATABASE_NAME);
    }

    @After
    public void after() {
        TestHelper.cleanDatabase(mongo, DATABASE_NAME);
    }

    protected Class<MongoDbConnector> connectorClass() {
        return MongoDbConnector.class;
    }

    protected Configuration.Builder config() {
        return (Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration(mongo).edit().with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME)).with(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST, this.fullDataCollectionName() + ",dbA.c1,dbA.c2")).with(MongoDbConnectorConfig.SIGNAL_DATA_COLLECTION, SIGNAL_COLLECTION_NAME)).with(MongoDbConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)).with(MongoDbConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)).with(MongoDbConnectorConfig.SNAPSHOT_MODE, (EnumeratedValue)MongoDbConnectorConfig.SnapshotMode.NO_DATA);
    }

    protected String dataCollectionName() {
        return COLLECTION_NAME;
    }

    protected List<String> dataCollectionNames() {
        return List.of(COLLECTION_NAME, COLLECTION2_NAME);
    }

    protected String fullDataCollectionName() {
        return FULL_COLLECTION_NAME;
    }

    protected List<String> fullDataCollectionNames() {
        return List.of(FULL_COLLECTION_NAME, FULL_COLLECTION2_NAME);
    }

    protected String topicName() {
        return "mongo1." + this.fullDataCollectionName();
    }

    protected List<String> topicNames() {
        return this.fullDataCollectionNames().stream().map(x -> "mongo1." + x).collect(Collectors.toList());
    }

    protected void populateDataCollection(String dataCollectionName) {
        Document[] documents = new Document[1000];
        for (int i = 0; i < 1000; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, (Object)(i + 1)).append("aa", (Object)i);
            documents[i] = doc;
        }
        this.insertDocumentsInTx(DATABASE_NAME, dataCollectionName, documents);
    }

    protected void populateDataCollection() {
        this.populateDataCollection(this.dataCollectionName());
    }

    protected void populateDataCollections() {
        for (String dataCollectionName : this.dataCollectionNames()) {
            this.populateDataCollection(dataCollectionName);
        }
    }

    protected void insertAdditionalData() {
        this.insertAdditionalData(COLLECTION_NAME);
    }

    protected void insertAdditionalData(String collectionName) {
        Document[] documents = new Document[1000];
        for (int i = 0; i < 1000; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, (Object)(i + 1000 + 1)).append("aa", (Object)(i + 1000));
            documents[i] = doc;
        }
        this.insertDocumentsInTx(DATABASE_NAME, collectionName, documents);
    }

    protected void updateData(int batchSize) {
        for (int i = 0; i < 1000 / batchSize; ++i) {
            Document gt = new Document();
            gt.append("$gt", (Object)(i * batchSize));
            Document lte = new Document();
            lte.append("$lte", (Object)((i + 1) * batchSize));
            Document filter = new Document();
            filter.append("$and", Arrays.asList(new Document().append(DOCUMENT_ID, (Object)gt), new Document().append(DOCUMENT_ID, (Object)lte)));
            Document update = new Document();
            update.append("$inc", (Object)new Document().append("aa", (Object)2000));
            this.updateDocumentsInTx(DATABASE_NAME, COLLECTION_NAME, filter, update);
        }
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig) {
        this.startConnector(custConfig, this.loggingCompletion());
    }

    protected void startConnector(Function<Configuration.Builder, Configuration.Builder> custConfig, DebeziumEngine.CompletionCallback callback) {
        Configuration config = custConfig.apply(this.config()).build();
        this.start(this.connectorClass(), config, callback);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(5L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
    }

    protected void startConnector() {
        this.startConnector(Function.identity(), this.loggingCompletion());
    }

    protected void waitForConnectorToStart() {
        this.assertConnectorIsRunning();
    }

    protected void sendAdHocSnapshotSignal(String ... dataCollectionIds) throws SQLException {
        String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\\\"" + x + "\\\"").collect(Collectors.joining(", "));
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)("{\"type\": \"execute-snapshot\", \"payload\": \"{\\\"data-collections\\\": [" + dataCollectionIdsList + "]}\"}")));
    }

    protected void sendAdHocSnapshotStopSignal(String ... dataCollectionIds) throws SQLException {
        Object dataCollections;
        if (dataCollectionIds.length == 0) {
            dataCollections = "";
        } else {
            String dataCollectionIdsList = Arrays.stream(dataCollectionIds).map(x -> "\\\"" + x + "\\\"").collect(Collectors.joining(", "));
            dataCollections = ", \\\"data-collections\\\": [" + dataCollectionIdsList + "]";
        }
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)("{\"type\": \"stop-snapshot\", \"payload\": \"{\\\"type\\\": \\\"INCREMENTAL\\\"" + (String)dataCollections + "}\"}")));
    }

    protected void sendAdHocSnapshotSignal() throws SQLException {
        this.sendAdHocSnapshotSignal(this.fullDataCollectionName());
    }

    protected void sendPauseSignal() throws SQLException {
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)"{\"type\": \"pause-snapshot\", \"payload\": \"{}\"}"));
    }

    protected void sendResumeSignal() throws SQLException {
        this.insertDocuments(DATABASE_NAME, "signals", Document.parse((String)"{\"type\": \"resume-snapshot\", \"payload\": \"{}\"}"));
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, this.topicName());
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, this::extractFieldValue, x -> true, null, topicName);
    }

    protected Integer extractFieldValue(SourceRecord record) {
        String after = ((Struct)record.value()).getString("after");
        Pattern p = Pattern.compile("\"" + this.valueFieldName() + "\": (\\d+)");
        Matcher m = p.matcher(after);
        m.find();
        return Integer.parseInt(m.group(1));
    }

    protected <V> Map<Integer, V> consumeMixedWithIncrementalSnapshot(int recordCount, Function<SourceRecord, V> valueConverter, Predicate<Map.Entry<Integer, V>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer, String topicName) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, dataCompleted, k -> Integer.parseInt(k.getString(this.pkFieldName())), valueConverter, topicName, recordConsumer);
    }

    protected <V, K> Map<K, V> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<K, V>> dataCompleted, Function<Struct, K> idCalculator, Function<SourceRecord, V> valueConverter, String topicName, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        HashMap dbChanges = new HashMap();
        int noRecords = 0;
        while (true) {
            AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
            List dataRecords = records.recordsForTopic(topicName);
            if (records.allRecordsInOrder().isEmpty()) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)(++noRecords)).describedAs(String.format("Too many no data record results, %d < %d", dbChanges.size(), recordCount), new Object[0])).isLessThanOrEqualTo(3);
                continue;
            }
            noRecords = 0;
            if (dataRecords == null || dataRecords.isEmpty()) continue;
            dataRecords.forEach(record -> {
                Object id = idCalculator.apply((Struct)record.key());
                Object value = valueConverter.apply((SourceRecord)record);
                dbChanges.put(id, value);
            });
            if (recordConsumer != null) {
                recordConsumer.accept(dataRecords);
            }
            if (dbChanges.size() >= recordCount && !dbChanges.entrySet().stream().anyMatch(dataCompleted.negate())) break;
        }
        Assertions.assertThat(dbChanges).hasSize(recordCount);
        return dbChanges;
    }

    protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), x -> true, null, this.topicName());
    }

    protected Map<Integer, Integer> consumeMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, Integer>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, this::extractFieldValue, dataCompleted, recordConsumer, this.topicName());
    }

    protected Map<Integer, SourceRecord> consumeRecordsMixedWithIncrementalSnapshot(int recordCount, Predicate<Map.Entry<Integer, SourceRecord>> dataCompleted, Consumer<List<SourceRecord>> recordConsumer) throws InterruptedException {
        return this.consumeMixedWithIncrementalSnapshot(recordCount, Function.identity(), dataCompleted, recordConsumer, this.topicName());
    }

    protected String valueFieldName() {
        return "aa";
    }

    protected String pkFieldName() {
        return "id";
    }

    private <K> void snapshotOnly(K initialId, Function<K, K> idGenerator) throws Exception {
        LinkedHashMap<K, Document> documents = new LinkedHashMap<K, Document>();
        K key = initialId;
        for (int i = 0; i < 1000; ++i) {
            Document doc = new Document();
            doc.append(DOCUMENT_ID, key).append(this.valueFieldName(), (Object)i);
            documents.put(key, doc);
            key = idGenerator.apply(key);
        }
        this.insertDocumentsInTx(DATABASE_NAME, COLLECTION_NAME, (Document[])documents.values().toArray(Document[]::new));
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        Map<String, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, k -> k.getString(this.pkFieldName()), this::extractFieldValue, this.topicName(), null);
        JsonSerialization serialization = new JsonSerialization();
        try (MongoClient connection = this.connect();){
            CodecRegistry codecs = connection.getDatabase(DATABASE_NAME).getCollection(COLLECTION_NAME).getCodecRegistry();
            Map<String, Integer> expected = documents.values().stream().map(d -> d.toBsonDocument(BsonDocument.class, codecs)).collect(Collectors.toMap(arg_0 -> ((JsonSerialization)serialization).getDocumentId(arg_0), d -> d.getInt32((Object)this.valueFieldName()).getValue()));
            Assertions.assertThat(dbChanges).containsAllEntriesOf(expected);
        }
    }

    @Test
    public void shouldStreamWithDatabaseIncludeList() throws InterruptedException {
        this.startConnector(config -> ((Configuration.Builder)config.with(MongoDbConnectorConfig.DATABASE_INCLUDE_LIST, DATABASE_NAME)).without(MongoDbConnectorConfig.COLLECTION_INCLUDE_LIST.name()), this.loggingCompletion());
        this.assertConnectorIsRunning();
        this.assertNoRecordsToConsume();
        this.insertDocuments(DATABASE_NAME, COLLECTION_NAME, new Document("foo", (Object)"bar"));
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(1);
        records.topics().forEach(System.out::println);
        Assertions.assertThat((Iterable)records.topics()).contains((Object[])new String[]{this.topicName()});
        Assertions.assertThat((List)records.recordsForTopic(this.topicName())).hasSize(1);
    }

    @Test
    public void snapshotOnlyInt32() throws Exception {
        this.snapshotOnly(0, k -> k + 1);
    }

    @Test
    public void snapshotOnlyWithInt64() throws Exception {
        long firstKey = 0x80000000L;
        this.snapshotOnly(firstKey, k -> k + 1L);
    }

    @Test
    public void snapshotOnlyDouble() throws Exception {
        this.snapshotOnly(0.0, k -> k + 1.0);
    }

    @Test
    public void snapshotOnlyDecimal128() throws Exception {
        Assume.assumeTrue((String)"Decimal 128 not supported", (boolean)TestHelper.decimal128Supported());
        BigDecimal firstKey = BigDecimal.valueOf(Long.MAX_VALUE).add(BigDecimal.ONE);
        this.snapshotOnly(firstKey, k -> k.add(BigDecimal.ONE));
    }

    @Test
    public void snapshotOnlyObjectId() throws Exception {
        ObjectId firstKey = new ObjectId();
        this.snapshotOnly(firstKey, k -> new ObjectId());
    }

    @Test
    public void snapshotOnlyUUID() throws Exception {
        this.snapshotOnly(UUID.randomUUID(), k -> UUID.randomUUID());
    }

    @Test
    public void snapshotOnlyString() throws Exception {
        Supplier<String> keySupplier = () -> UUID.randomUUID().toString();
        this.snapshotOnly(keySupplier.get(), k -> (String)keySupplier.get());
    }

    @Test
    public void invalidTablesInTheList() throws Exception {
        this.populateDataCollection();
        this.startConnector();
        this.sendAdHocSnapshotSignal("dbA.invalid1", this.fullDataCollectionName(), "invalid2");
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void snapshotOnlyWithRestart() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
        this.populateDataCollection();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage("No data returned by the query, incremental snapshotting of table '" + this.fullDataCollectionName() + "' finished"));
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), config);
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void inserts() throws Exception {
        this.populateDataCollection();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        this.insertAdditionalData();
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void updates() throws Exception {
        this.populateDataCollection();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        this.updateData(10);
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesWithRestart() throws Exception {
        this.populateDataCollection();
        Configuration config = this.config().build();
        this.startAndConsumeTillEnd(this.connectorClass(), config);
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        this.updateData(10);
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), config);
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    public void updatesLargeChunk() throws Exception {
        this.populateDataCollection();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1000));
        this.sendAdHocSnapshotSignal();
        this.updateData(1000);
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> (Integer)x.getValue() >= 2000, null);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)(i + 2000))});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithoutCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
        this.populateDataCollection();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.sendAdHocSnapshotSignalAndWait(new String[0]);
        this.sendAdHocSnapshotStopSignalAndWait(new String[0]);
        Assertions.assertThat((boolean)this.consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(interceptor, "Stopping incremental snapshot")).isTrue();
        this.stopConnector(r -> interceptor.clear());
        this.startConnector();
        Assertions.assertThat((boolean)interceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        this.sendAdHocSnapshotSignal();
        this.insertAdditionalData();
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void stopCurrentIncrementalSnapshotWithAllCollectionsAndTakeNewNewIncrementalSnapshotAfterRestart() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
        this.populateDataCollection();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.sendAdHocSnapshotSignalAndWait(new String[0]);
        this.sendAdHocSnapshotStopSignalAndWait(this.fullDataCollectionName());
        Assertions.assertThat((boolean)this.consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(interceptor, "Removing '[" + this.fullDataCollectionName() + "]' collections from incremental snapshot")).isTrue();
        this.stopConnector(r -> interceptor.clear());
        this.startConnector();
        Assertions.assertThat((boolean)interceptor.containsMessage("No incremental snapshot in progress")).isTrue();
        this.sendAdHocSnapshotSignal();
        this.insertAdditionalData();
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void removeNotYetCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
        this.populateDataCollections();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250));
        List<String> collectionIds = this.fullDataCollectionNames();
        Assertions.assertThat(collectionIds).hasSize(2);
        String collectionIdToRemove = collectionIds.get(1);
        this.sendAdHocSnapshotSignal(collectionIds.toArray(new String[0]));
        this.sendAdHocSnapshotStopSignal(collectionIdToRemove);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage("Removing '[" + collectionIdToRemove + "]' collections from incremental snapshot"));
        this.insertAdditionalData(COLLECTION_NAME);
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000, this.topicName());
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    @FixFor(value={"DBZ-4271"})
    public void removeStartedCapturedCollectionFromInProgressIncrementalSnapshot() throws Exception {
        LogInterceptor interceptor = new LogInterceptor(MongoDbIncrementalSnapshotChangeEventSource.class);
        this.populateDataCollections();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 250));
        List<String> collectionIds = this.fullDataCollectionNames();
        Assertions.assertThat(collectionIds).hasSize(2);
        List<String> topicNames = this.topicNames();
        Assertions.assertThat(topicNames).hasSize(2);
        String collectionIdToRemove = collectionIds.get(0);
        this.sendAdHocSnapshotSignal(collectionIds.toArray(new String[0]));
        this.sendAdHocSnapshotStopSignal(collectionIdToRemove);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> interceptor.containsMessage("Removing '[" + collectionIdToRemove + "]' collections from incremental snapshot"));
        this.insertAdditionalData(COLLECTION2_NAME);
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000, topicNames.get(1));
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void pauseDuringSnapshot() throws Exception {
        this.populateDataCollection();
        this.startConnector(x -> (Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 1));
        this.waitForConnectorToStart();
        this.sendAdHocSnapshotSignal();
        ArrayList records = new ArrayList();
        String topicName = this.topicName();
        this.consumeRecords(100, record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add(record);
            }
        });
        this.sendPauseSignal();
        this.consumeAvailableRecords(record -> {
            if (topicName.equalsIgnoreCase(record.topic())) {
                records.add(record);
            }
        });
        int beforeResume = records.size();
        this.sendResumeSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000 - beforeResume);
        for (int i = beforeResume + 1; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void insertInsertWatermarkingStrategy() throws Exception {
        this.populateDataCollection();
        this.startConnector();
        this.waitForConnectorToStart();
        this.insertAdditionalData();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
        this.assertCloseEventCount(closeEventCount -> Assertions.assertThat((Long)closeEventCount).isNotZero());
    }

    @Test
    public void insertDeleteWatermarkingStrategy() throws Exception {
        this.populateDataCollection();
        this.startConnector(x -> (Configuration.Builder)((Configuration.Builder)x.with(CommonConnectorConfig.INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY, "insert_delete")).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false));
        this.sendAdHocSnapshotSignal();
        this.insertAdditionalData();
        int expectedRecordCount = 2000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(2000);
        for (int i = 0; i < 2000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
        this.assertCloseEventCount(closeEventCount -> Assertions.assertThat((Long)closeEventCount).isZero());
    }

    private void assertCloseEventCount(Consumer<Long> consumer) {
        try (MongoClient client = TestHelper.connect(mongo);){
            MongoDatabase db = client.getDatabase(DATABASE_NAME);
            MongoCollection collection = db.getCollection("signals");
            Document filter = new Document();
            filter.put("type", (Object)"snapshot-window-close");
            consumer.accept(collection.countDocuments((Bson)filter));
        }
    }

    protected void sendAdHocSnapshotSignalAndWait(String ... collectionIds) throws Exception {
        if (collectionIds.length == 0) {
            this.sendAdHocSnapshotSignal();
        } else {
            this.sendAdHocSnapshotSignal(collectionIds);
        }
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean result = new AtomicBoolean(false);
            this.consumeAvailableRecords(r -> {
                if (r.topic().endsWith(SIGNAL_COLLECTION_NAME)) {
                    result.set(true);
                }
            });
            return result.get();
        });
    }

    protected void sendAdHocSnapshotStopSignalAndWait(String ... collectionIds) throws Exception {
        this.sendAdHocSnapshotStopSignal(collectionIds);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).until(() -> {
            AtomicBoolean stopSignal = new AtomicBoolean(false);
            this.consumeAvailableRecords(r -> {
                String type;
                String after;
                if (r.topic().endsWith(SIGNAL_COLLECTION_NAME) && (after = ((Struct)r.value()).getString("after")) != null && "stop-snapshot".equals(type = Document.parse((String)after).getString((Object)"type"))) {
                    stopSignal.set(true);
                }
            });
            return stopSignal.get();
        });
    }

    protected boolean consumeAnyRemainingIncrementalSnapshotEventsAndCheckForStopMessage(LogInterceptor interceptor, String stopMessage) throws Exception {
        AtomicBoolean stopMessageFound = new AtomicBoolean(false);
        Awaitility.await().atMost(60L, TimeUnit.SECONDS).pollDelay(5L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> {
            if (interceptor.containsMessage(stopMessage)) {
                stopMessageFound.set(true);
            }
            return this.consumeAvailableRecords(r -> {}) == 0;
        });
        return stopMessageFound.get();
    }

    protected int getMaximumEnqueuedRecordCount() {
        return 3000;
    }
}

