/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tests.mongodb;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

public abstract class MongoTests
extends ConnectorTest {
    public MongoTests(KafkaController kafkaController, KafkaConnectController connectController, ConnectorConfigBuilder connectorConfig, KafkaAssertions<?, ?> assertions) {
        super(kafkaController, connectController, connectorConfig, assertions);
    }

    public void insertCustomer(MongoDatabaseController dbController, String firstName, String lastName, String email) throws SQLException {
        MongoDatabaseClient client = dbController.getDatabaseClient(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD, ConfigProperties.DATABASE_MONGO_DBZ_LOGIN_DBNAME);
        client.execute(ConfigProperties.DATABASE_MONGO_DBZ_DBNAME, "customers", col -> {
            Document doc = new Document().append("first_name", (Object)firstName).append("last_name", (Object)lastName).append("email", (Object)email);
            col.insertOne((Object)doc);
        });
    }

    @Test
    @Order(value=1)
    public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = new OkHttpClient().newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
            }
        });
    }

    @Test
    @Order(value=2)
    public void shouldCreateKafkaTopics() {
        String prefix = this.connectorConfig.getDbServerName();
        this.assertions.assertTopicsExist(prefix + ".inventory.customers", prefix + ".inventory.orders", prefix + ".inventory.products");
    }

    @Test
    @Order(value=3)
    public void shouldSnapshotChanges() {
        this.connectController.getMetricsReader().waitForMongoSnapshot(this.connectorConfig.getDbServerName());
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 4));
    }

    @Test
    @Order(value=4)
    public void shouldStreamChanges(MongoDatabaseController dbController) throws SQLException {
        this.insertCustomer(dbController, "Tom", "Tester", "tom@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 5));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "tom@test.com"));
    }

    @Test
    @Order(value=5)
    public void shouldBeDown(MongoDatabaseController dbController) throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        this.insertCustomer(dbController, "Jerry", "Tester", "jerry@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 5));
    }

    @Test
    @Order(value=6)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 6));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "jerry@test.com"));
    }

    @Test
    @Order(value=7)
    public void shouldBeDownAfterCrash(MongoDatabaseController dbController) throws SQLException {
        this.connectController.destroy();
        this.insertCustomer(dbController, "Nibbles", "Tester", "nibbles@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 6));
    }

    @Test
    @Order(value=8)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertMinimalRecordsCount(topic, 7));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "nibbles@test.com"));
    }
}

