/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.IncrementalSnapshotIT;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.assertj.core.api.Assertions;
import org.junit.Test;

public class MultiThreadIncrementalSnapshotIT
extends IncrementalSnapshotIT {
    protected static final int ROW_COUNT = 1000;
    private static final int INCREMENTAL_SNAPSHOT_THREADS = 7;

    @Override
    protected Configuration.Builder config() {
        Configuration.Builder builder = super.config();
        return (Configuration.Builder)builder.with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, 7);
    }

    @Test
    public void multiThreadingSnapshot() throws Exception {
        this.populateDataCollection();
        this.startConnector();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000);
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }

    @Test
    public void multiThreadSnapshotWithRestart() throws Exception {
        this.populateDataCollection();
        this.startAndConsumeTillEnd(this.connectorClass(), this.config().build());
        this.waitForConnectorToStart();
        this.waitForAvailableRecords(1L, TimeUnit.SECONDS);
        this.assertNoRecordsToConsume();
        this.sendAdHocSnapshotSignal();
        int expectedRecordCount = 1000;
        AtomicInteger recordCounter = new AtomicInteger();
        AtomicBoolean restarted = new AtomicBoolean();
        Map<Integer, Integer> dbChanges = this.consumeMixedWithIncrementalSnapshot(1000, x -> true, x -> {
            if (recordCounter.addAndGet(x.size()) > 50 && !restarted.get()) {
                this.stopConnector();
                this.assertConnectorNotRunning();
                this.start(this.connectorClass(), ((Configuration.Builder)super.config().with(MongoDbConnectorConfig.SNAPSHOT_MAX_THREADS, 9)).build());
                this.waitForConnectorToStart();
                restarted.set(true);
            }
        });
        for (int i = 0; i < 1000; ++i) {
            Assertions.assertThat(dbChanges).contains(new Map.Entry[]{Assertions.entry((Object)(i + 1), (Object)i)});
        }
    }
}

