/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.openshift.db2;

import io.debezium.testing.openshift.ConnectorTestBase;
import io.debezium.testing.openshift.assertions.KafkaAssertions;
import io.debezium.testing.openshift.resources.ConnectorFactories;
import io.debezium.testing.openshift.tools.ConfigProperties;
import io.debezium.testing.openshift.tools.databases.SqlDatabaseClient;
import io.debezium.testing.openshift.tools.databases.db2.DB2Controller;
import io.debezium.testing.openshift.tools.databases.db2.DB2Deployer;
import io.debezium.testing.openshift.tools.kafka.ConnectorConfigBuilder;
import java.io.IOException;
import java.sql.SQLException;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Tags;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

@TestMethodOrder(value=MethodOrderer.OrderAnnotation.class)
@Tags(value={@Tag(value="acceptance"), @Tag(value="db2 ")})
public class DB2ConnectorIT
extends ConnectorTestBase {
    public static final String DB_DEPLOYMENT_PATH = "/database-resources/db2/deployment.yaml";
    public static final String DB_SERVICE_PATH_LB = "/database-resources/db2/service-lb.yaml";
    public static final String DB_SERVICE_PATH = "/database-resources/db2/service.yaml";
    public static final String CONNECTOR_NAME = "inventory-connector-db2";
    private static DB2Controller dbController;
    private static OkHttpClient httpClient;
    private static ConnectorFactories connectorFactories;
    private static ConnectorConfigBuilder connectorConfig;
    private static String connectorName;
    private static String dbServerName;

    @BeforeAll
    public static void setupDatabase() throws IOException, InterruptedException, ClassNotFoundException {
        Class.forName("com.ibm.db2.jcc.DB2Driver");
        if (!ConfigProperties.DATABASE_DB2_HOST.isPresent()) {
            dbController = (DB2Controller)((DB2Deployer)((DB2Deployer)((DB2Deployer)new DB2Deployer(ocp).withProject(ConfigProperties.OCP_PROJECT_DB2)).withDeployment(DB_DEPLOYMENT_PATH)).withServices(new String[]{DB_SERVICE_PATH_LB, DB_SERVICE_PATH})).deploy();
            dbController.initialize();
        }
        connectorName = "inventory-connector-db2-" + testUtils.getUniqueId();
        dbServerName = connectorName.replaceAll("-", "_");
        connectorConfig = connectorFactories.db2(dbServerName);
        if (ConfigProperties.DEPLOY_SERVICE_REGISTRY) {
            connectorConfig.addApicurioAvroSupport(registryController.getRegistryApiAddress());
        }
        kafkaConnectController.deployConnector(connectorName, connectorConfig);
    }

    @AfterAll
    public static void tearDownDatabase() throws IOException, InterruptedException {
        kafkaConnectController.undeployConnector(connectorName);
        dbController.reload();
    }

    private void insertCustomer(String firstName, String lastName, String email) throws SQLException {
        SqlDatabaseClient client = dbController.getDatabaseClient(ConfigProperties.DATABASE_DB2_DBZ_USERNAME, ConfigProperties.DATABASE_DB2_DBZ_PASSWORD);
        String sql = "INSERT INTO DB2INST1.CUSTOMERS(first_name,last_name,email) VALUES  ('" + firstName + "', '" + lastName + "', '" + email + "')";
        client.execute(ConfigProperties.DATABASE_DB2_DBZ_DBNAME, sql);
    }

    @Test
    @Order(value=1)
    public void shouldHaveRegisteredConnector() {
        Request r = new Request.Builder().url(kafkaConnectController.getApiURL().resolve("/connectors")).build();
        KafkaAssertions.awaitAssert(() -> {
            try (Response res = httpClient.newCall(r).execute();){
                Assertions.assertThat((String)res.body().string()).contains(new CharSequence[]{connectorName});
            }
        });
    }

    @Test
    @Order(value=2)
    public void shouldCreateKafkaTopics() {
        assertions.assertTopicsExist(dbServerName + ".DB2INST1.CUSTOMERS", dbServerName + ".DB2INST1.ORDERS", dbServerName + ".DB2INST1.PRODUCTS", dbServerName + ".DB2INST1.PRODUCTS_ON_HAND");
    }

    @Test
    @Order(value=3)
    public void shouldContainRecordsInCustomersTopic() throws IOException {
        kafkaConnectController.waitForDB2Snapshot(dbServerName);
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 4));
    }

    @Test
    @Order(value=4)
    public void shouldStreamChanges() throws SQLException {
        this.insertCustomer("Tom", "Tester", "tom@test.com");
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 5));
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsContain(dbServerName + ".DB2INST1.CUSTOMERS", "tom@test.com"));
    }

    @Test
    @Order(value=5)
    public void shouldBeDown() throws SQLException, IOException {
        kafkaConnectController.undeployConnector(connectorName);
        this.insertCustomer("Jerry", "Tester", "jerry@test.com");
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 5));
    }

    @Test
    @Order(value=6)
    public void shouldResumeStreamingAfterRedeployment() throws IOException, InterruptedException {
        kafkaConnectController.deployConnector(connectorName, connectorConfig);
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 6));
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsContain(dbServerName + ".DB2INST1.CUSTOMERS", "jerry@test.com"));
    }

    @Test
    @Order(value=7)
    public void shouldBeDownAfterCrash() throws SQLException {
        operatorController.disable();
        kafkaConnectController.destroy();
        this.insertCustomer("Nibbles", "Tester", "nibbles@test.com");
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 6));
    }

    @Test
    @Order(value=8)
    public void shouldResumeStreamingAfterCrash() throws InterruptedException {
        operatorController.enable();
        kafkaConnectController.waitForConnectCluster();
        KafkaAssertions.awaitAssert(() -> assertions.assertMinimalRecordsCount(dbServerName + ".DB2INST1.CUSTOMERS", 7));
        KafkaAssertions.awaitAssert(() -> assertions.assertRecordsContain(dbServerName + ".DB2INST1.CUSTOMERS", "nibbles@test.com"));
    }

    static {
        httpClient = new OkHttpClient();
        connectorFactories = new ConnectorFactories();
    }
}

