/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tests.postgresql;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.fixtures.TestRuntimeFixture;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import java.io.IOException;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

public interface PostgreSqlTestCases
extends TestRuntimeFixture<SqlDatabaseController> {
    default public void insertCustomer(String firstName, String lastName, String email) throws SQLException {
        SqlDatabaseClient client = ((SqlDatabaseController)this.getDbController()).getDatabaseClient(ConfigProperties.DATABASE_POSTGRESQL_USERNAME, ConfigProperties.DATABASE_POSTGRESQL_PASSWORD);
        String sql = "INSERT INTO inventory.customers VALUES  (default, '" + firstName + "', '" + lastName + "', '" + email + "')";
        client.execute(ConfigProperties.DATABASE_POSTGRESQL_DBZ_DBNAME, sql);
    }

    @Test
    @Order(value=1)
    default public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(this.getKafkaConnectController().getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = new OkHttpClient().newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{this.getConnectorConfig().getConnectorName()});
            }
        });
    }

    @Test
    @Order(value=2)
    default public void shouldCreateKafkaTopics() {
        String prefix = this.getConnectorConfig().getDbServerName();
        this.assertions().assertTopicsExist(prefix + ".inventory.customers", prefix + ".inventory.orders", prefix + ".inventory.products", prefix + ".inventory.products_on_hand");
    }

    @Test
    @Order(value=3)
    default public void shouldContainRecordsInCustomersTopic() throws IOException {
        this.getConnectorMetrics().waitForPostgreSqlSnapshot(this.getConnectorConfig().getDbServerName());
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 4));
    }

    @Test
    @Order(value=4)
    default public void shouldStreamChanges() throws SQLException {
        this.insertCustomer("Tom", "Tester", "tom@test.com");
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 5));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "tom@test.com"));
    }

    @Test
    @Order(value=5)
    default public void shouldBeDown() throws SQLException, IOException {
        this.getKafkaConnectController().undeployConnector(this.getConnectorConfig().getConnectorName());
        this.insertCustomer("Jerry", "Tester", "jerry@test.com");
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 5));
    }

    @Test
    @Order(value=6)
    default public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
        this.getKafkaConnectController().deployConnector(this.getConnectorConfig());
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 6));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "jerry@test.com"));
    }

    @Test
    @Order(value=7)
    default public void shouldBeDownAfterCrash() throws SQLException {
        this.getKafkaConnectController().destroy();
        this.insertCustomer("Nibbles", "Tester", "nibbles@test.com");
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsCount(topic, 6));
    }

    @Test
    @Order(value=8)
    default public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.getKafkaConnectController().restore();
        String topic = this.getConnectorConfig().getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions().assertMinimalRecordsCount(topic, 7));
        KafkaAssertions.awaitAssert(() -> this.assertions().assertRecordsContain(topic, "nibbles@test.com"));
    }
}

