/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.testcontainers;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.DebeziumContainer;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

public class DebeziumContainerTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumContainerTest.class);
    private static Network network = Network.newNetwork();
    private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network);
    public static PostgreSQLContainer<?> postgresContainer = (PostgreSQLContainer)((PostgreSQLContainer)new PostgreSQLContainer("debezium/postgres:11").withNetwork(network)).withNetworkAliases(new String[]{"postgres"});
    public static DebeziumContainer debeziumContainer = (DebeziumContainer)((DebeziumContainer)((DebeziumContainer)new DebeziumContainer("debezium/connect:1.1.1.Final").withNetwork(network)).withKafka(kafkaContainer).withLogConsumer((Consumer)new Slf4jLogConsumer(LOGGER))).dependsOn(new Startable[]{kafkaContainer});

    @BeforeClass
    public static void startContainers() {
        Startables.deepStart(Stream.of(kafkaContainer, postgresContainer, debeziumContainer)).join();
    }

    @Test
    public void canRegisterConnector() throws Exception {
        debeziumContainer.registerConnector("my-connector-1", this.getConfiguration(1));
        Awaitility.await().pollInterval(Duration.ofMillis(250L)).atMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            String status = this.executeHttpRequest(debeziumContainer.getConnectorStatus("my-connector-1"));
            Assertions.assertThat((String)((String)JsonPath.read((String)status, (String)"$.name", (Predicate[])new Predicate[0]))).isEqualTo((Object)"my-connector-1");
            Assertions.assertThat((String)((String)JsonPath.read((String)status, (String)"$.connector.state", (Predicate[])new Predicate[0]))).isEqualTo((Object)"RUNNING");
            Assertions.assertThat((String)((String)JsonPath.read((String)status, (String)"$.tasks[0].state", (Predicate[])new Predicate[0]))).isEqualTo((Object)"RUNNING");
        });
    }

    @Test
    public void shouldRegisterPostgreSQLConnector() throws Exception {
        try (Connection connection = this.getConnection(postgresContainer);
             Statement statement = connection.createStatement();
             KafkaConsumer<String, String> consumer = this.getConsumer(kafkaContainer);){
            statement.execute("create schema todo");
            statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
            statement.execute("alter table todo.Todo replica identity full");
            statement.execute("insert into todo.Todo values (1, 'Be Awesome')");
            statement.execute("insert into todo.Todo values (2, 'Learn Quarkus')");
            debeziumContainer.registerConnector("my-connector", this.getConfiguration(2));
            consumer.subscribe(Arrays.asList("dbserver2.todo.todo"));
            List<ConsumerRecord<String, String>> changeEvents = this.drain(consumer, 2);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.id", (Predicate[])new Predicate[0]))).isEqualTo(1);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"r");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Be Awesome");
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(1).key()), (String)"$.id", (Predicate[])new Predicate[0]))).isEqualTo(2);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(1).value()), (String)"$.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"r");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(1).value()), (String)"$.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Quarkus");
            statement.execute("update todo.Todo set title = 'Learn Java' where id = 2");
            changeEvents = this.drain(consumer, 1);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.id", (Predicate[])new Predicate[0]))).isEqualTo(2);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"u");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.before.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Quarkus");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Java");
            consumer.unsubscribe();
        }
    }

    private Connection getConnection(PostgreSQLContainer<?> postgresContainer) throws SQLException {
        return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());
    }

    private KafkaConsumer<String, String> getConsumer(KafkaContainer kafkaContainer) {
        return new KafkaConsumer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)kafkaContainer.getBootstrapServers(), (Object)"group.id", (Object)("tc-" + UUID.randomUUID()), (Object)"auto.offset.reset", (Object)"earliest"), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    private List<ConsumerRecord<String, String>> drain(KafkaConsumer<String, String> consumer, int expectedRecordCount) {
        ArrayList<ConsumerRecord<String, String>> allRecords = new ArrayList<ConsumerRecord<String, String>>();
        Unreliables.retryUntilTrue((int)10, (TimeUnit)TimeUnit.SECONDS, () -> {
            consumer.poll(Duration.ofMillis(50L).toMillis()).iterator().forEachRemaining(allRecords::add);
            return allRecords.size() == expectedRecordCount;
        });
        return allRecords;
    }

    private ConnectorConfiguration getConfiguration(int id) {
        return ConnectorConfiguration.forJdbcContainer(postgresContainer).with("database.server.name", "dbserver" + id).with("slot.name", "debezium_" + id);
    }

    private String executeHttpRequest(String url) throws IOException {
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(url).build();
        try (Response response = client.newCall(request).execute();){
            String string = response.body().string();
            return string;
        }
    }
}

