/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mongodb.AbstractShardedMongoConnectorIT;
import io.debezium.connector.mongodb.ChangeStreamPipeline;
import io.debezium.connector.mongodb.ChangeStreamPipelineFactory;
import io.debezium.connector.mongodb.Filters;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.MongoDbConnectorTask;
import io.debezium.connector.mongodb.MongoDbOffsetContext;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.util.Collect;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.Document;
import org.junit.Before;
import org.junit.Test;

public class OffsetConsolidationShardedIT
extends AbstractShardedMongoConnectorIT {
    public static final String TOPIC_PREFIX = "mongo";
    private static final Comparator<ChangeStreamDocument<BsonDocument>> EVENT_COMPARATOR = Comparator.comparing(ChangeStreamDocument::getClusterTime);
    public Configuration config;
    private MongoDbConnectorConfig connectorConfig;
    private final SortedSet<ChangeStreamDocument<BsonDocument>> allRouterEvents = new TreeSet<ChangeStreamDocument<BsonDocument>>(EVENT_COMPARATOR);
    private final List<ChangeStreamDocument<BsonDocument>> shardEvents0 = new ArrayList<ChangeStreamDocument<BsonDocument>>();
    private final List<ChangeStreamDocument<BsonDocument>> shardEvents1 = new ArrayList<ChangeStreamDocument<BsonDocument>>();

    @Before
    public void setupDatabase() {
        Assumptions.assumeThat((int)mongo.size()).isEqualTo(2);
        this.config = ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration((MongoDbDeployment)mongo).edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)).with(MongoDbConnectorConfig.ALLOW_OFFSET_INVALIDATION, true)).build();
        this.connectorConfig = new MongoDbConnectorConfig(this.config);
        ChangeStreamPipelineFactory pipelineFactory = new ChangeStreamPipelineFactory(this.connectorConfig, new Filters.FilterConfig(this.config));
        this.insertInitialDocuments(pipelineFactory.create());
    }

    public void insertInitialDocuments(ChangeStreamPipeline pipeline) {
        List stages = pipeline.getStages();
        try (MongoClient router = OffsetConsolidationShardedIT.connect();
             MongoClient shard0 = OffsetConsolidationShardedIT.connect((MongoDbDeployment)mongo.getShard(0));
             MongoClient shard1 = OffsetConsolidationShardedIT.connect((MongoDbDeployment)mongo.getShard(1));){
            ChangeStreamIterable routerStream = router.watch(stages, BsonDocument.class);
            ChangeStreamIterable shardStream0 = shard0.watch(stages, BsonDocument.class);
            ChangeStreamIterable shardStream1 = shard1.watch(stages, BsonDocument.class);
            int counter = 0;
            try (MongoChangeStreamCursor rc = routerStream.cursor();
                 MongoChangeStreamCursor sc0 = shardStream0.cursor();
                 MongoChangeStreamCursor sc1 = shardStream1.cursor();){
                while (this.shardEvents0.size() < 2 || this.shardEvents1.size() < 2) {
                    router.getDatabase(this.shardedDatabase()).getCollection(this.shardedCollection()).insertOne((Object)new Document("_id", (Object)counter).append("name", (Object)("name_" + counter)));
                    ++counter;
                    ChangeStreamDocument r = (ChangeStreamDocument)rc.next();
                    ChangeStreamDocument s0 = (ChangeStreamDocument)sc0.tryNext();
                    ChangeStreamDocument s1 = (ChangeStreamDocument)sc1.tryNext();
                    if (s0 != null) {
                        this.shardEvents0.add((ChangeStreamDocument<BsonDocument>)s0);
                    }
                    if (s1 != null) {
                        this.shardEvents1.add((ChangeStreamDocument<BsonDocument>)s1);
                    }
                    this.allRouterEvents.add((ChangeStreamDocument<BsonDocument>)r);
                }
            }
        }
    }

    private void storeReplicaSetModeOffsets(List<ChangeStreamDocument<BsonDocument>> shard0, List<ChangeStreamDocument<BsonDocument>> shard1) throws InterruptedException {
        HashMap offsets = new HashMap();
        Stream.of(this.prepareReplicaSetOffset(mongo.getShard(0).getName(), shard0), this.prepareReplicaSetOffset(mongo.getShard(1).getName(), shard1)).forEach(offsets::putAll);
        this.storeOffsets(this.config, offsets);
    }

    private void storeShardedModeOffsets(List<ChangeStreamDocument<BsonDocument>> events) throws InterruptedException {
        Map<Map<String, ?>, Map<String, ?>> offsets = this.prepareReplicaSetOffset("cluster", events);
        this.storeOffsets(this.config, offsets);
    }

    private Map<Map<String, ?>, Map<String, ?>> prepareReplicaSetOffset(String rsName, List<ChangeStreamDocument<BsonDocument>> events) {
        if (events.isEmpty()) {
            return Map.of();
        }
        ChangeStreamDocument<BsonDocument> lastEvent = events.get(events.size() - 2);
        MongoDbOffsetContext offset = MongoDbOffsetContext.empty((MongoDbConnectorConfig)this.connectorConfig);
        offset.changeStreamEvent(lastEvent);
        Map sourceOffset = offset.getOffset();
        Map sourcePartition = Collect.hashMapOf((Object)"server_id", (Object)TOPIC_PREFIX, (Object)"rs", (Object)rsName);
        return Map.of(sourcePartition, sourceOffset);
    }

    public static <T> T nextToLastElement(List<T> collection) {
        if (collection.size() < 2) {
            throw new IndexOutOfBoundsException("At least 2 elements required");
        }
        return collection.get(collection.size() - 2);
    }

    public ChangeStreamDocument<BsonDocument> getOffsetEvent(List<ChangeStreamDocument<BsonDocument>> shard0, List<ChangeStreamDocument<BsonDocument>> shard1) {
        ChangeStreamDocument<BsonDocument> event1;
        ChangeStreamDocument<BsonDocument> event0 = OffsetConsolidationShardedIT.nextToLastElement(shard0);
        return EVENT_COMPARATOR.compare(event0, event1 = OffsetConsolidationShardedIT.nextToLastElement(shard1)) < 0 ? event0 : event1;
    }

    public ChangeStreamDocument<BsonDocument> getOffsetEvent(List<ChangeStreamDocument<BsonDocument>> events) {
        return OffsetConsolidationShardedIT.nextToLastElement(events);
    }

    public Set<String> getExpectedIds(ChangeStreamDocument<BsonDocument> offsetEvent) {
        List<ChangeStreamDocument<BsonDocument>> following = this.allRouterEvents.stream().dropWhile(e -> EVENT_COMPARATOR.compare(offsetEvent, (ChangeStreamDocument<BsonDocument>)e) >= 0).collect(Collectors.toList());
        return this.getExpectedIds(following);
    }

    public Set<String> getExpectedIds(Collection<ChangeStreamDocument<BsonDocument>> events) {
        return events.stream().map(e -> e.getDocumentKey()).map(k -> k.getInt32((Object)"_id")).map(BsonInt32::getValue).map(String::valueOf).collect(Collectors.toSet());
    }

    @Test
    public void shouldConsolidateOffsetsFromRsMode() throws InterruptedException {
        Assumptions.assumeThat((int)mongo.size()).isEqualTo(2);
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        this.storeReplicaSetModeOffsets(this.shardEvents0, this.shardEvents1);
        this.start(MongoDbConnector.class, this.config);
        OffsetConsolidationShardedIT.waitForStreamingRunning((String)"mongodb", (String)TOPIC_PREFIX, (String)"streaming", (String)"0");
        ChangeStreamDocument<BsonDocument> offsetEvent = this.getOffsetEvent(this.shardEvents0, this.shardEvents1);
        Set<String> expected = this.getExpectedIds(offsetEvent);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expected.size());
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSameSizeAs(expected);
        records.forEach(record -> {
            this.verifyNotFromInitialSnapshot((SourceRecord)record);
            String id = ((Struct)record.key()).getString("id");
            Assertions.assertThat((String)id).isIn((Iterable)expected);
        });
        logInterceptor.containsMessage("checking shard specific offsets");
    }

    @Test
    public void shouldUseOffsetsFromShardedMode() throws InterruptedException {
        Assumptions.assumeThat((int)mongo.size()).isEqualTo(2);
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        ArrayList<ChangeStreamDocument<BsonDocument>> events = new ArrayList<ChangeStreamDocument<BsonDocument>>(this.allRouterEvents);
        this.storeShardedModeOffsets(events);
        this.start(MongoDbConnector.class, this.config);
        OffsetConsolidationShardedIT.waitForStreamingRunning((String)"mongodb", (String)TOPIC_PREFIX, (String)"streaming", (String)"0");
        ChangeStreamDocument<BsonDocument> offsetEvent = this.getOffsetEvent(events);
        Set<String> expected = this.getExpectedIds(offsetEvent);
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expected.size());
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSameSizeAs(expected);
        records.forEach(record -> {
            this.verifyNotFromInitialSnapshot((SourceRecord)record);
            String id = ((Struct)record.key()).getString("id");
            Assertions.assertThat((String)id).isIn((Iterable)expected);
        });
        logInterceptor.containsMessage("Found compatible offset from previous version");
    }

    @Test
    public void shouldFailToConsolidateOffsetsFromRsModeWhenInvalidationIsNotAllowed() throws InterruptedException {
        Assumptions.assumeThat((int)mongo.size()).isEqualTo(2);
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        Configuration config = ((Configuration.Builder)this.config.edit().with(MongoDbConnectorConfig.ALLOW_OFFSET_INVALIDATION, false)).build();
        this.storeReplicaSetModeOffsets(this.shardEvents0, this.shardEvents1);
        this.start(MongoDbConnector.class, config);
        logInterceptor.containsErrorMessage("Offset invalidation is not allowed");
        logInterceptor.containsStacktraceElement("Offsets from previous version are invalid, either manually delete");
    }

    @Test
    public void shouldFailToConsolidateOffsetsFromRsModeWhenOneShardOffsetIsMissing() throws InterruptedException {
        Assumptions.assumeThat((int)mongo.size()).isEqualTo(2);
        LogInterceptor logInterceptor = new LogInterceptor(MongoDbConnectorTask.class);
        this.storeReplicaSetModeOffsets(this.shardEvents0, List.of());
        this.start(MongoDbConnector.class, this.config);
        OffsetConsolidationShardedIT.waitForStreamingRunning((String)"mongodb", (String)TOPIC_PREFIX, (String)"streaming", (String)"0");
        Set<String> expected = this.getExpectedIds(this.allRouterEvents);
        logInterceptor.containsErrorMessage("At least one shard is missing previously recorded offset, so empty offset will be use");
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expected.size());
        Assertions.assertThat((List)records.allRecordsInOrder()).hasSameSizeAs(expected);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.verifyFromInitialSnapshot((SourceRecord)record, foundLast);
            String id = ((Struct)record.key()).getString("id");
            Assertions.assertThat((String)id).isIn((Iterable)expected);
        });
    }
}

