/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Updates;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.AbstractShardedMongoConnectorIT;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.Test;

public class ShardedMongoDbConnectorIT
extends AbstractShardedMongoConnectorIT {
    public static final String TOPIC_PREFIX = "mongo";
    private static final int INIT_DOCUMENT_COUNT = 1000;
    private static final int NEW_DOCUMENT_COUNT = 4;
    private static final int STOPPED_NEW_DOCUMENT_COUNT = 5;

    protected static void populateCollection(String dbName, String colName, int count) {
        ShardedMongoDbConnectorIT.populateCollection(dbName, colName, 0, count);
    }

    protected static void populateCollection(String dbName, String colName, int startId, int count) {
        try (MongoClient client = ShardedMongoDbConnectorIT.connect();){
            MongoDatabase db = client.getDatabase(dbName);
            MongoCollection collection = db.getCollection(colName);
            List items = IntStream.range(startId, startId + count).mapToObj(i -> new Document("_id", (Object)i).append("name", (Object)("name_" + i))).collect(Collectors.toList());
            collection.insertMany(items);
        }
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseWithShardedConnectionMode() throws InterruptedException {
        this.shouldConsumeAllEventsFromDatabase(MongoDbConnectorConfig.ConnectionMode.SHARDED);
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseWithReplicaSetConnectionMode() throws InterruptedException {
        this.shouldConsumeAllEventsFromDatabase(MongoDbConnectorConfig.ConnectionMode.REPLICA_SET);
    }

    public void shouldConsumeAllEventsFromDatabase(MongoDbConnectorConfig.ConnectionMode connectionMode) throws InterruptedException {
        int documentCount = 0;
        String topic = String.format("%s.%s.%s", TOPIC_PREFIX, this.shardedDatabase(), this.shardedCollection());
        ShardedMongoDbConnectorIT.populateCollection(this.shardedDatabase(), this.shardedCollection(), 1000);
        Configuration config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration((MongoDbDeployment)mongo).edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(MongoDbConnectorConfig.CONNECTION_MODE, (EnumeratedValue)connectionMode)).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)).build();
        this.start(MongoDbConnector.class, config);
        this.consumeAndVerifyFromInitialSync(connectionMode, topic, 1000);
        ShardedMongoDbConnectorIT.populateCollection(this.shardedDatabase(), this.shardedCollection(), documentCount += 1000, 4);
        this.consumeAndVerifyNotFromInitialSync(topic, 4);
        this.stopConnector();
        ShardedMongoDbConnectorIT.populateCollection(this.shardedDatabase(), this.shardedCollection(), documentCount += 4, 5);
        documentCount += 5;
        this.start(MongoDbConnector.class, config);
        this.consumeAndVerifyNotFromInitialSync(topic, 5);
        try (MongoClient client = ShardedMongoDbConnectorIT.connect();){
            MongoDatabase db = client.getDatabase(this.shardedDatabase());
            MongoCollection collection = db.getCollection(this.shardedCollection());
            collection.updateOne((Bson)new Document("_id", (Object)0), Updates.set((String)"name", (Object)"Tom"));
        }
        this.consumeAndVerifyNotFromInitialSync(topic, 1, Envelope.Operation.UPDATE);
    }

    protected void consumeAndVerifyFromInitialSync(MongoDbConnectorConfig.ConnectionMode connectionMode, String topic, int expectedRecords) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecords);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((int)records.recordsForTopic(topic).size()).isEqualTo(expectedRecords);
        AtomicInteger lastCount = new AtomicInteger();
        int expectedLastCount = connectionMode == MongoDbConnectorConfig.ConnectionMode.SHARDED ? 1 : mongo.size();
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSync((SourceRecord)record, expectedLastCount, lastCount);
            this.verifyOperation((SourceRecord)record, Envelope.Operation.READ);
        });
        Assertions.assertThat((int)lastCount.get()).isEqualTo(expectedLastCount);
    }

    protected void verifyFromInitialSync(SourceRecord record, int numOfShards, AtomicInteger lastCounter) {
        if (record.sourceOffset().containsKey("initsync")) {
            Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isTrue();
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"true");
        } else {
            Assertions.assertThat((int)lastCounter.getAndIncrement()).isLessThanOrEqualTo(numOfShards);
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"last");
        }
    }

    protected void consumeAndVerifyNotFromInitialSync(String topic, int expectedRecords) throws InterruptedException {
        this.consumeAndVerifyNotFromInitialSync(topic, expectedRecords, Envelope.Operation.CREATE);
    }

    protected void consumeAndVerifyNotFromInitialSync(String topic, int expectedRecords, Envelope.Operation op) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecords);
        Assertions.assertThat((int)records.recordsForTopic(topic).size()).isEqualTo(expectedRecords);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSync((SourceRecord)record);
            this.verifyOperation((SourceRecord)record, op);
        });
    }

    protected void verifyNotFromInitialSync(SourceRecord record) {
        Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isFalse();
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isNull();
    }

    protected void verifyOperation(SourceRecord record, Envelope.Operation expected) {
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)expected.code());
    }
}

