/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tests.mongodb;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.fixtures.TestRuntimeFixture;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseClient;
import io.debezium.testing.system.tools.databases.mongodb.MongoDatabaseController;
import java.io.IOException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.bson.Document;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

public interface MongoTestCases
extends TestRuntimeFixture<MongoDatabaseController> {
    default public void insertCustomer(String firstName, String lastName, String email) {
        MongoDatabaseClient client = ((MongoDatabaseController)this.getDbController()).getDatabaseClient(ConfigProperties.DATABASE_MONGO_DBZ_USERNAME, ConfigProperties.DATABASE_MONGO_DBZ_PASSWORD, ConfigProperties.DATABASE_MONGO_DBZ_LOGIN_DBNAME);
        client.execute(ConfigProperties.DATABASE_MONGO_DBZ_DBNAME, "customers", col -> {
            Document doc = new Document().append("first_name", (Object)firstName).append("last_name", (Object)lastName).append("email", (Object)email);
            col.insertOne((Object)doc);
        });
    }

    @Test
    @Order(value=1)
    default public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(this.getKafkaConnectController().getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = new OkHttpClient().newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{this.getConnectorConfig().getConnectorName()});
            }
        });
    }

    @Test
    @Order(value=2)
    default public void shouldCreateKafkaTopics() {
        String prefix = this.getConnectorConfig().getDbServerName();
        this.assertions().assertTopicsExist(prefix + ".inventory.customers", prefix + ".inventory.orders", prefix + ".inventory.products");
    }

    @Test
    @Order(value=3)
    default public void shouldContainRecordsInCustomersTopic() throws IOException {
        this.getConnectorMetrics().waitForMongoSnapshot(this.getConnectorConfig().getDbServerName());
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 4));
    }

    @Test
    @Order(value=4)
    default public void shouldStreamChanges() {
        this.insertCustomer("Tom", "Tester", "tom@test.com");
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 5));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "tom@test.com"));
    }

    @Test
    @Order(value=5)
    default public void shouldBeDown() throws IOException {
        this.getKafkaConnectController().undeployConnector(this.getConnectorConfig().getConnectorName());
        this.insertCustomer("Jerry", "Tester", "jerry@test.com");
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 5));
    }

    @Test
    @Order(value=6)
    default public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
        this.getKafkaConnectController().deployConnector(this.getConnectorConfig());
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 6));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "jerry@test.com"));
    }

    @Test
    @Order(value=7)
    default public void shouldBeDownAfterCrash() {
        this.getKafkaConnectController().destroy();
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        this.insertCustomer("Nibbles", "Tester", "nibbles@test.com");
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 6));
    }

    @Test
    @Order(value=8)
    default public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.getKafkaConnectController().restore();
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertMinimalRecordsCount(topic, 7));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "nibbles@test.com"));
    }
}

