/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.system.tests.db2;

import io.debezium.testing.system.assertions.KafkaAssertions;
import io.debezium.testing.system.tests.ConnectorTest;
import io.debezium.testing.system.tools.ConfigProperties;
import io.debezium.testing.system.tools.databases.SqlDatabaseClient;
import io.debezium.testing.system.tools.databases.SqlDatabaseController;
import io.debezium.testing.system.tools.kafka.ConnectorConfigBuilder;
import io.debezium.testing.system.tools.kafka.KafkaConnectController;
import io.debezium.testing.system.tools.kafka.KafkaController;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;

public abstract class Db2Tests
extends ConnectorTest {
    public Db2Tests(KafkaController kafkaController, KafkaConnectController connectController, ConnectorConfigBuilder connectorConfig, KafkaAssertions<?, ?> assertions) {
        super(kafkaController, connectController, connectorConfig, assertions);
    }

    public void insertCustomer(SqlDatabaseController dbController, String firstName, String lastName, String email) throws SQLException {
        SqlDatabaseClient client = dbController.getDatabaseClient(ConfigProperties.DATABASE_DB2_USERNAME, ConfigProperties.DATABASE_DB2_DBZ_PASSWORD);
        String sql = "INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) VALUES  ('" + firstName + "', '" + lastName + "', '" + email + "')";
        client.execute("inventory", sql);
    }

    public void renameCustomer(SqlDatabaseController dbController, String oldName, String newName) throws SQLException {
        SqlDatabaseClient client = dbController.getDatabaseClient(ConfigProperties.DATABASE_DB2_USERNAME, ConfigProperties.DATABASE_DB2_DBZ_PASSWORD);
        String sql = "UPDATE DB2INST1.CUSTOMERS SET first_name = '" + newName + "' WHERE first_name = '" + oldName + "'";
        client.execute("inventory", sql);
    }

    @Test
    @Order(value=10)
    public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(this.connectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = new OkHttpClient().newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{this.connectorConfig.getConnectorName()});
            }
        });
    }

    @Test
    @Order(value=20)
    public void shouldCreateKafkaTopics() {
        String prefix = this.connectorConfig.getDbServerName();
        this.assertions.assertTopicsExist(prefix + ".DB2INST1.CUSTOMERS", prefix + ".DB2INST1.ORDERS", prefix + ".DB2INST1.PRODUCTS", prefix + ".DB2INST1.PRODUCTS_ON_HAND");
    }

    @Test
    @Order(value=30)
    public void shouldSnapshotChanges() {
        this.connectController.getMetricsReader().waitForDB2Snapshot(this.connectorConfig.getDbServerName());
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 4));
    }

    @Test
    @Order(value=40)
    public void shouldStreamChanges(SqlDatabaseController dbController) throws SQLException {
        this.insertCustomer(dbController, "Tom", "Tester", "tom@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 5));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "tom@test.com"));
    }

    @Test
    @Order(value=41)
    public void shouldRerouteUpdates(SqlDatabaseController dbController) throws SQLException {
        this.renameCustomer(dbController, "Tom", "Thomas");
        String prefix = this.connectorConfig.getDbServerName();
        String updatesTopic = prefix + ".u.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(prefix + ".DB2INST1.CUSTOMERS", 5));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(updatesTopic, 1));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(updatesTopic, "Thomas"));
    }

    @Test
    @Order(value=50)
    public void shouldBeDown(SqlDatabaseController dbController) throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        this.insertCustomer(dbController, "Jerry", "Tester", "jerry@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 5));
    }

    @Test
    @Order(value=60)
    public void shouldResumeStreamingAfterRedeployment() throws Exception {
        this.connectController.deployConnector(this.connectorConfig);
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 6));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "jerry@test.com"));
    }

    @Test
    @Order(value=70)
    public void shouldBeDownAfterCrash(SqlDatabaseController dbController) throws SQLException {
        this.connectController.destroy();
        this.insertCustomer(dbController, "Nibbles", "Tester", "nibbles@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 6));
    }

    @Test
    @Order(value=80)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        this.connectController.restore();
        String topic = this.connectorConfig.getDbServerName() + ".DB2INST1.CUSTOMERS";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertMinimalRecordsCount(topic, 7));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsContain(topic, "nibbles@test.com"));
    }

    @Test
    @Order(value=90)
    public void shouldExtractNewRecordState(SqlDatabaseController dbController) throws Exception {
        this.connectController.undeployConnector(this.connectorConfig.getConnectorName());
        this.connectorConfig = this.connectorConfig.addUnwrapSMT();
        this.connectController.deployConnector(this.connectorConfig);
        this.insertCustomer(dbController, "Eaton", "Beaver", "ebeaver@test.com");
        String topic = this.connectorConfig.getDbServerName() + ".inventory.customers";
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordsCount(topic, 8));
        KafkaAssertions.awaitAssert(() -> this.assertions.assertRecordIsUnwrapped(topic, 1));
    }
}

