/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.mongodb;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.connector.mongodb.MongoDbConnector;
import io.debezium.connector.mongodb.MongoDbConnectorConfig;
import io.debezium.connector.mongodb.TestHelper;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseProvider;
import io.debezium.connector.mongodb.junit.MongoDbDatabaseVersionResolver;
import io.debezium.connector.mongodb.junit.MongoDbPlatform;
import io.debezium.data.Envelope;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.testing.testcontainers.MongoDbDeployment;
import io.debezium.testing.testcontainers.MongoDbReplicaSet;
import io.debezium.testing.testcontainers.util.DockerUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.bson.Document;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbConnectorDatabaseRestrictedIT
extends AbstractConnectorTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(MongoDbConnectorDatabaseRestrictedIT.class);
    public static final String AUTH_DATABASE = "admin";
    public static final String TEST_DATABASE = "dbit";
    public static final String TEST_DATABASE2 = "dbother";
    public static final String TEST_COLLECTION = "items";
    public static final String TEST_ALLOWED_USER = "testUser";
    public static final String TEST_ALLOWED_PWD = "testSecret";
    public static final String TEST_DISALLOWED_USER = "testOtherUser";
    public static final String TEST_DISALLOWED_PWD = "testOtherSecret";
    public static final String TOPIC_PREFIX = "mongo";
    private static final int INIT_DOCUMENT_COUNT = 10;
    private static final int NEW_DOCUMENT_COUNT = 4;
    protected static MongoDbReplicaSet mongo;

    @BeforeClass
    public static void beforeAll() {
        Assume.assumeTrue((boolean)MongoDbDatabaseVersionResolver.getPlatform().equals((Object)MongoDbPlatform.MONGODB_DOCKER));
        DockerUtils.enableFakeDnsIfRequired();
        mongo = MongoDbDatabaseProvider.dockerAuthReplicaSet();
        LOGGER.info("Starting {}...", (Object)mongo);
        mongo.start();
        LOGGER.info("Setting up users");
        mongo.createUser(TEST_ALLOWED_USER, TEST_ALLOWED_PWD, AUTH_DATABASE, new String[]{"read:dbit"});
        mongo.createUser(TEST_DISALLOWED_USER, TEST_DISALLOWED_PWD, AUTH_DATABASE, new String[]{"read:dbother"});
    }

    @AfterClass
    public static void afterAll() {
        DockerUtils.disableFakeDns();
        if (mongo != null) {
            mongo.stop();
        }
    }

    @Before
    public void beforeEach() {
        this.stopConnector();
        this.initializeConnectorTestFramework();
        TestHelper.cleanDatabase((MongoDbDeployment)mongo, TEST_DATABASE);
    }

    @After
    public void afterEach() {
        this.stopConnector();
    }

    protected static MongoClient connect() {
        return MongoClients.create((String)mongo.getConnectionString());
    }

    protected static void populateCollection(String dbName, String colName, int count) {
        MongoDbConnectorDatabaseRestrictedIT.populateCollection(dbName, colName, 0, count);
    }

    protected static void populateCollection(String dbName, String colName, int startId, int count) {
        try (MongoClient client = MongoDbConnectorDatabaseRestrictedIT.connect();){
            MongoDatabase db = client.getDatabase(dbName);
            MongoCollection collection = db.getCollection(colName);
            List items = IntStream.range(startId, startId + count).mapToObj(i -> new Document("_id", (Object)i).append("name", (Object)("name_" + i))).collect(Collectors.toList());
            collection.insertMany(items);
        }
    }

    protected Configuration connectorConfiguration(String user, String password) {
        String connectionString = mongo.getAuthConnectionString(user, password, AUTH_DATABASE);
        return ((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)((Configuration.Builder)TestHelper.getConfiguration(connectionString).edit().with(MongoDbConnectorConfig.POLL_INTERVAL_MS, 10)).with(CommonConnectorConfig.TOPIC_PREFIX, TOPIC_PREFIX)).with(MongoDbConnectorConfig.CAPTURE_SCOPE, (EnumeratedValue)MongoDbConnectorConfig.CaptureScope.DATABASE)).with(MongoDbConnectorConfig.CAPTURE_TARGET, TEST_DATABASE)).with(CommonConnectorConfig.MAX_RETRIES_ON_ERROR, 2)).build();
    }

    @Test
    public void shouldConsumeAllEventsFromDatabaseWithPermissions() throws InterruptedException {
        int documentCount = 0;
        String topic = String.format("%s.%s.%s", TOPIC_PREFIX, TEST_DATABASE, TEST_COLLECTION);
        MongoDbConnectorDatabaseRestrictedIT.populateCollection(TEST_DATABASE, TEST_COLLECTION, 10);
        Configuration config = this.connectorConfiguration(TEST_ALLOWED_USER, TEST_ALLOWED_PWD);
        this.start(MongoDbConnector.class, config);
        this.consumeAndVerifyFromInitialSnapshot(topic, 10);
        MongoDbConnectorDatabaseRestrictedIT.populateCollection(TEST_DATABASE, TEST_COLLECTION, documentCount += 10, 4);
        documentCount += 4;
        this.consumeAndVerifyNotFromInitialSnapshot(topic, 4);
    }

    @Test
    public void shouldFailWithoutPermissions() {
        LogInterceptor logInterceptor = new LogInterceptor(ErrorHandler.class);
        MongoDbConnectorDatabaseRestrictedIT.populateCollection(TEST_DATABASE, TEST_COLLECTION, 10);
        Configuration config = this.connectorConfiguration(TEST_DISALLOWED_USER, TEST_DISALLOWED_PWD);
        this.start(MongoDbConnector.class, config);
        Awaitility.await().pollDelay(10L, TimeUnit.SECONDS).timeout(30L, TimeUnit.SECONDS).until(() -> !this.isEngineRunning.get());
        Assertions.assertThat((boolean)logInterceptor.containsMessage("The maximum number of 2 retries has been attempted")).isTrue();
    }

    protected void consumeAndVerifyFromInitialSnapshot(String topic, int expectedRecords) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecords);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        Assertions.assertThat((int)records.recordsForTopic(topic).size()).isEqualTo(expectedRecords);
        AtomicBoolean foundLast = new AtomicBoolean(false);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyFromInitialSnapshot((SourceRecord)record, foundLast);
            this.verifyOperation((SourceRecord)record, Envelope.Operation.READ);
        });
        Assertions.assertThat((boolean)foundLast.get()).isTrue();
    }

    protected void verifyFromInitialSnapshot(SourceRecord record, AtomicBoolean foundLast) {
        if (record.sourceOffset().containsKey("initsync")) {
            Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isTrue();
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"true");
        } else {
            Assertions.assertThat((boolean)foundLast.getAndSet(true)).isFalse();
            Struct value = (Struct)record.value();
            Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isEqualTo((Object)"last");
        }
    }

    protected void consumeAndVerifyNotFromInitialSnapshot(String topic, int expectedRecords) throws InterruptedException {
        this.consumeAndVerifyNotFromInitialSnapshot(topic, expectedRecords, Envelope.Operation.CREATE);
    }

    protected void consumeAndVerifyNotFromInitialSnapshot(String topic, int expectedRecords, Envelope.Operation op) throws InterruptedException {
        AbstractConnectorTest.SourceRecords records = this.consumeRecordsByTopic(expectedRecords);
        Assertions.assertThat((int)records.recordsForTopic(topic).size()).isEqualTo(expectedRecords);
        Assertions.assertThat((int)records.topics().size()).isEqualTo(1);
        records.forEach(record -> {
            this.validate((SourceRecord)record);
            this.verifyNotFromInitialSnapshot((SourceRecord)record);
            this.verifyOperation((SourceRecord)record, op);
        });
    }

    protected void verifyNotFromInitialSnapshot(SourceRecord record) {
        Assertions.assertThat((boolean)record.sourceOffset().containsKey("initsync")).isFalse();
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getStruct("source").getString("snapshot")).isNull();
    }

    protected void verifyOperation(SourceRecord record, Envelope.Operation expected) {
        Struct value = (Struct)record.value();
        Assertions.assertThat((String)value.getString("op")).isEqualTo((Object)expected.code());
    }
}

