/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.testing.testcontainers;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.debezium.testing.testcontainers.ConnectorConfiguration;
import io.debezium.testing.testcontainers.DebeziumContainer;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.fest.assertions.Assertions;
import org.junit.BeforeClass;
import org.junit.Test;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.ImageFromDockerfile;
import org.testcontainers.images.builder.dockerfile.DockerfileBuilder;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;

public class ApicurioRegistryTest {
    private static final String DEBEZIUM_VERSION = "1.2.3.Final";
    private static final String APICURIO_VERSION = "1.3.0.Final";
    private static final Logger LOGGER = LoggerFactory.getLogger(ApicurioRegistryTest.class);
    private static Network network = Network.newNetwork();
    private static GenericContainer<?> apicurioContainer = new GenericContainer("apicurio/apicurio-registry-mem:1.3.0.Final").withNetwork(network).withExposedPorts(new Integer[]{8080}).waitingFor((WaitStrategy)new LogMessageWaitStrategy().withRegEx(".*apicurio-registry-app.*started in.*"));
    private static KafkaContainer kafkaContainer = new KafkaContainer().withNetwork(network);
    public static PostgreSQLContainer<?> postgresContainer = (PostgreSQLContainer)((PostgreSQLContainer)new PostgreSQLContainer("debezium/postgres:11").withNetwork(network)).withNetworkAliases(new String[]{"postgres"});
    public static ImageFromDockerfile apicurioDebeziumImage = (ImageFromDockerfile)new ImageFromDockerfile().withDockerfileFromBuilder(builder -> ((DockerfileBuilder)((DockerfileBuilder)((DockerfileBuilder)((DockerfileBuilder)builder.from("debezium/connect:1.2.3.Final")).env("KAFKA_CONNECT_DEBEZIUM_DIR", "$KAFKA_CONNECT_PLUGINS_DIR/debezium-connector-postgres")).env("APICURIO_VERSION", APICURIO_VERSION)).run("cd $KAFKA_CONNECT_DEBEZIUM_DIR && curl https://repo1.maven.org/maven2/io/apicurio/apicurio-registry-distro-connect-converter/$APICURIO_VERSION/apicurio-registry-distro-connect-converter-$APICURIO_VERSION-converter.tar.gz | tar xzv")).build());
    public static DebeziumContainer debeziumContainer = (DebeziumContainer)((DebeziumContainer)((DebeziumContainer)new DebeziumContainer((Future)apicurioDebeziumImage).withNetwork(network)).withKafka(kafkaContainer).withLogConsumer((Consumer)new Slf4jLogConsumer(LOGGER))).dependsOn(new Startable[]{kafkaContainer});

    @BeforeClass
    public static void startContainers() {
        Startables.deepStart(Stream.of(apicurioContainer, kafkaContainer, postgresContainer, debeziumContainer)).join();
    }

    @Test
    public void shouldConvertToJson() throws Exception {
        try (Connection connection = this.getConnection(postgresContainer);
             Statement statement = connection.createStatement();
             KafkaConsumer<String, String> consumer = this.getConsumerString(kafkaContainer);){
            statement.execute("drop schema if exists todo cascade");
            statement.execute("create schema todo");
            statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
            statement.execute("alter table todo.Todo replica identity full");
            statement.execute("insert into todo.Todo values (1, 'Be Awesome')");
            statement.execute("insert into todo.Todo values (2, 'Learn Quarkus')");
            debeziumContainer.registerConnector("my-connector-json", this.getConfiguration(1, "io.apicurio.registry.utils.converter.ExtJsonConverter", new String[0]));
            consumer.subscribe(Arrays.asList("dbserver1.todo.todo"));
            List<ConsumerRecord<String, String>> changeEvents = this.drain(consumer, 2);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.payload.id", (Predicate[])new Predicate[0]))).isEqualTo(1);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.schemaId", (Predicate[])new Predicate[0]))).isNotNull();
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.payload.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"r");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.payload.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Be Awesome");
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(1).key()), (String)"$.payload.id", (Predicate[])new Predicate[0]))).isEqualTo(2);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(1).value()), (String)"$.payload.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"r");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(1).value()), (String)"$.payload.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Quarkus");
            statement.execute("update todo.Todo set title = 'Learn Java' where id = 2");
            changeEvents = this.drain(consumer, 1);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.payload.id", (Predicate[])new Predicate[0]))).isEqualTo(2);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.payload.op", (Predicate[])new Predicate[0]))).isEqualTo((Object)"u");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.payload.before.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Quarkus");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.payload.after.title", (Predicate[])new Predicate[0]))).isEqualTo((Object)"Learn Java");
            consumer.unsubscribe();
        }
    }

    @Test
    public void shouldConvertToAvro() throws Exception {
        try (Connection connection = this.getConnection(postgresContainer);
             Statement statement = connection.createStatement();
             KafkaConsumer<byte[], byte[]> consumer = this.getConsumerBytes(kafkaContainer);){
            statement.execute("drop schema if exists todo cascade");
            statement.execute("create schema todo");
            statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
            statement.execute("alter table todo.Todo replica identity full");
            statement.execute("insert into todo.Todo values (1, 'Be Awesome')");
            debeziumContainer.registerConnector("my-connector-avro", this.getConfiguration(2, "io.apicurio.registry.utils.converter.AvroConverter", new String[0]));
            consumer.subscribe(Arrays.asList("dbserver2.todo.todo"));
            List<ConsumerRecord<byte[], byte[]>> changeEvents = this.drain(consumer, 1);
            Assertions.assertThat((byte)((byte[])changeEvents.get(0).key())[0]).isZero();
            Assertions.assertThat((byte)((byte[])changeEvents.get(0).value())[0]).isZero();
            consumer.unsubscribe();
        }
    }

    @Test
    public void shouldConvertToCloudEventWithDataAsAvro() throws Exception {
        try (Connection connection = this.getConnection(postgresContainer);
             Statement statement = connection.createStatement();
             KafkaConsumer<String, String> consumer = this.getConsumerString(kafkaContainer);){
            statement.execute("drop schema if exists todo cascade");
            statement.execute("create schema todo");
            statement.execute("create table todo.Todo (id int8 not null, title varchar(255), primary key (id))");
            statement.execute("alter table todo.Todo replica identity full");
            statement.execute("insert into todo.Todo values (3, 'Be Awesome')");
            String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
            int port = (Integer)apicurioContainer.getExposedPorts().get(0);
            String apicurioUrl = "http://" + host + ":" + port + "/api";
            String id = "3";
            ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer).with("database.server.name", "dbserver" + id).with("slot.name", "debezium_" + id).with("key.converter", "org.apache.kafka.connect.json.JsonConverter").with("value.converter", "io.debezium.converters.CloudEventsConverter").with("value.converter.data.serializer.type", "avro").with("value.converter.avro.apicurio.registry.url", apicurioUrl).with("value.converter.avro.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
            debeziumContainer.registerConnector("my-connector-cloudevents-avro", config);
            consumer.subscribe(Arrays.asList("dbserver3.todo.todo"));
            List<ConsumerRecord<String, String>> changeEvents = this.drain(consumer, 1);
            Assertions.assertThat((Integer)((Integer)JsonPath.read((String)((String)changeEvents.get(0).key()), (String)"$.payload.id", (Predicate[])new Predicate[0]))).isEqualTo(3);
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.iodebeziumop", (Predicate[])new Predicate[0]))).isEqualTo((Object)"r");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.iodebeziumname", (Predicate[])new Predicate[0]))).isEqualTo((Object)"dbserver3");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.datacontenttype", (Predicate[])new Predicate[0]))).isEqualTo((Object)"application/avro");
            Assertions.assertThat((String)((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.iodebeziumtable", (Predicate[])new Predicate[0]))).isEqualTo((Object)"todo");
            byte[] decodedBytes = Base64.getDecoder().decode((String)JsonPath.read((String)((String)changeEvents.get(0).value()), (String)"$.data", (Predicate[])new Predicate[0]));
            Assertions.assertThat((byte)decodedBytes[0]).isZero();
            consumer.unsubscribe();
        }
    }

    private Connection getConnection(PostgreSQLContainer<?> postgresContainer) throws SQLException {
        return DriverManager.getConnection(postgresContainer.getJdbcUrl(), postgresContainer.getUsername(), postgresContainer.getPassword());
    }

    private KafkaConsumer<String, String> getConsumerString(KafkaContainer kafkaContainer) {
        return new KafkaConsumer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)kafkaContainer.getBootstrapServers(), (Object)"group.id", (Object)("tc-" + UUID.randomUUID()), (Object)"auto.offset.reset", (Object)"earliest"), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    private KafkaConsumer<byte[], byte[]> getConsumerBytes(KafkaContainer kafkaContainer) {
        return new KafkaConsumer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)kafkaContainer.getBootstrapServers(), (Object)"group.id", (Object)("tc-" + UUID.randomUUID()), (Object)"auto.offset.reset", (Object)"earliest"), (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private <T> List<ConsumerRecord<T, T>> drain(KafkaConsumer<T, T> consumer, int expectedRecordCount) {
        ArrayList allRecords = new ArrayList();
        Unreliables.retryUntilTrue((int)10, (TimeUnit)TimeUnit.SECONDS, () -> {
            consumer.poll(Duration.ofMillis(50L).toMillis()).iterator().forEachRemaining(allRecords::add);
            return allRecords.size() == expectedRecordCount;
        });
        return allRecords;
    }

    private ConnectorConfiguration getConfiguration(int id, String converter, String ... options) {
        String host = apicurioContainer.getContainerInfo().getConfig().getHostName();
        int port = (Integer)apicurioContainer.getExposedPorts().get(0);
        String apicurioUrl = "http://" + host + ":" + port + "/api";
        ConnectorConfiguration config = ConnectorConfiguration.forJdbcContainer(postgresContainer).with("database.server.name", "dbserver" + id).with("slot.name", "debezium_" + id).with("key.converter", converter).with("key.converter.apicurio.registry.url", apicurioUrl).with("key.converter.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy").with("value.converter.apicurio.registry.url", apicurioUrl).with("value.converter", converter).with("value.converter.apicurio.registry.global-id", "io.apicurio.registry.utils.serde.strategy.AutoRegisterIdStrategy");
        if (options != null && options.length > 0) {
            for (int i = 0; i < options.length; i += 2) {
                config.with(options[i], options[i + 1]);
            }
        }
        return config;
    }
}

