/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.testcontainers;

import com.mongodb.ConnectionString;
import com.mongodb.ReadPreference;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import com.mongodb.connection.ServerDescription;
import com.mongodb.internal.selector.ReadPreferenceServerSelector;
import io.debezium.testing.testcontainers.MongoDbContainer;
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
import io.debezium.testing.testcontainers.util.DockerUtils;
import io.debezium.testing.testcontainers.util.ParsingPortResolver;
import io.debezium.testing.testcontainers.util.PooledPortResolver;
import io.debezium.testing.testcontainers.util.PortResolver;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbReplicaSetTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbReplicaSetTest.class);
    public static final String MONGO_DOCKER_DESKTOP_PORT_PROPERTY = "mongodb.docker.desktop.ports";

    @BeforeAll
    static void setupAll() {
        DockerUtils.enableFakeDnsIfRequired();
    }

    @AfterAll
    static void tearDownAll() {
        DockerUtils.disableFakeDns();
    }

    @AfterEach
    void tearDown() {
        System.clearProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY);
    }

    @Test
    public void testCluster() throws InterruptedException {
        this.testCluster(MongoDbReplicaSet.replicaSet());
    }

    @EnabledOnOs(value={OS.MAC, OS.WINDOWS})
    @Test
    public void testClusterWithPropertyPortList() throws InterruptedException {
        System.setProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, "27017,27018,27019");
        this.testCluster(MongoDbReplicaSet.replicaSet().portResolver((PortResolver)ParsingPortResolver.parseProperty((String)MONGO_DOCKER_DESKTOP_PORT_PROPERTY)));
    }

    @EnabledOnOs(value={OS.MAC, OS.WINDOWS})
    @Test
    public void testClusterWithPropertyPorRange() throws InterruptedException {
        System.setProperty(MONGO_DOCKER_DESKTOP_PORT_PROPERTY, "27017:27019");
        this.testCluster(MongoDbReplicaSet.replicaSet().portResolver((PortResolver)ParsingPortResolver.parseProperty((String)MONGO_DOCKER_DESKTOP_PORT_PROPERTY)));
    }

    @EnabledOnOs(value={OS.MAC, OS.WINDOWS})
    @Test
    public void testClusterWithInsufficientNumberOfPorts() throws InterruptedException {
        PooledPortResolver portResolver = new PooledPortResolver(Set.of(Integer.valueOf(27017), Integer.valueOf(27018)));
        Assertions.assertThatExceptionOfType(IllegalStateException.class).describedAs("Exception is thrown when two ports are available but three ports are required", new Object[0]).isThrownBy(() -> this.testCluster(MongoDbReplicaSet.replicaSet().portResolver((PortResolver)portResolver)));
    }

    public void testCluster(MongoDbReplicaSet.Builder replicaSet) throws InterruptedException {
        try (MongoDbReplicaSet cluster = replicaSet.build();){
            LOGGER.info("Starting {}...", (Object)cluster);
            cluster.start();
            ReadPreference readPreference = ReadPreference.primary();
            ConnectionString connectionString = new ConnectionString(cluster.getConnectionString() + "/?readPreference=" + readPreference.getName());
            LOGGER.info("Connecting to cluster: {}", (Object)connectionString);
            try (MongoClient client = MongoClients.create((ConnectionString)connectionString);){
                LOGGER.info("Connected to cluster: {}", (Object)client.getClusterDescription());
                MongoCollection<Document> collection = MongoDbReplicaSetTest.setup(client);
                this.run(cluster, client, collection);
            }
        }
    }

    private void run(MongoDbReplicaSet cluster, MongoClient client, MongoCollection<Document> collection) {
        try {
            MongoChangeStreamCursor cursor = collection.watch().batchSize(1).cursor();
            try {
                collection.insertOne((Object)Document.parse((String)"{username: 'user1', name: 'User 1'}"));
                collection.insertOne((Object)Document.parse((String)"{username: 'user2', name: 'User 2'}"));
                LOGGER.info("{}", cursor.next());
                LOGGER.info("Demoting primary");
                cluster.stepDown();
                Awaitility.await().atMost(30L, TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> cluster.tryPrimary().map(node -> !node.getNamedAddress().toString().equals(cursor.getServerAddress().toString()) && !node.getClientAddress().toString().equals(cursor.getServerAddress().toString())).orElse(false));
                if (!MongoDbContainer.IMAGE_VERSION.equals("4.0")) {
                    Assertions.assertThat((boolean)MongoDbReplicaSetTest.isSelectedReadPreference(client, collection, (MongoChangeStreamCursor<ChangeStreamDocument<Document>>)cursor)).isFalse();
                }
                throw new ResumableCursorException(cursor.getResumeToken());
            }
            catch (Throwable throwable) {
                if (cursor != null) {
                    try {
                        cursor.close();
                    }
                    catch (Throwable throwable2) {
                        throwable.addSuppressed(throwable2);
                    }
                }
                throw throwable;
            }
        }
        catch (ResumableCursorException e) {
            BsonDocument resumeToken = e.resumeToken();
            try (MongoChangeStreamCursor cursor = collection.watch().resumeAfter(resumeToken).batchSize(1).cursor();){
                Assertions.assertThat((boolean)MongoDbReplicaSetTest.isSelectedReadPreference(client, collection, (MongoChangeStreamCursor<ChangeStreamDocument<Document>>)cursor)).isTrue();
                LOGGER.info("{}", cursor.next());
            }
            return;
        }
    }

    private static boolean isSelectedReadPreference(MongoClient client, MongoCollection<Document> collection, MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor) {
        List candidates = new ReadPreferenceServerSelector(collection.getReadPreference()).select(client.getClusterDescription());
        return candidates.stream().map(ServerDescription::getAddress).anyMatch(address -> address.equals(cursor.getServerCursor() == null ? null : cursor.getServerCursor().getAddress()));
    }

    private static MongoCollection<Document> setup(MongoClient mongoClient) throws InterruptedException {
        MongoDatabase database = mongoClient.getDatabase("testChangeStreams");
        database.drop();
        Thread.sleep(1000L);
        return database.getCollection("documents");
    }

    public static class ResumableCursorException
    extends RuntimeException {
        private final BsonDocument resumeToken;

        ResumableCursorException(BsonDocument resumeToken) {
            this.resumeToken = resumeToken;
        }

        public BsonDocument resumeToken() {
            return this.resumeToken;
        }
    }
}

