/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.kafka;

import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.kafka.KafkaTestConfigSource;
import io.debezium.server.kafka.KafkaTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import javax.enterprise.event.Observes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=KafkaTestResourceLifecycleManager.class)})
public class KafkaIT {
    private static final String TOPIC_NAME = "testc.inventory.customers";
    private static final int MESSAGE_COUNT = 4;
    private static KafkaConsumer<String, String> consumer;

    public KafkaIT() {
        Testing.Files.delete((Path)KafkaTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)KafkaTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) {
        Testing.Print.enable();
        ConcurrentHashMap<String, Object> configs = new ConcurrentHashMap<String, Object>();
        configs.put("bootstrap.servers", KafkaTestResourceLifecycleManager.getBootstrapServers());
        configs.put("group.id", "test-" + UUID.randomUUID());
        consumer = new KafkaConsumer(configs, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @AfterAll
    static void stop() {
        if (consumer != null) {
            consumer.unsubscribe();
            consumer.close();
        }
    }

    @Test
    public void testKafka() {
        Awaitility.await().atMost(Duration.ofSeconds(KafkaTestConfigSource.waitForSeconds())).until(() -> consumer != null);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        ArrayList actual = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(KafkaTestConfigSource.waitForSeconds())).until(() -> {
            consumer.poll(Duration.ofSeconds(KafkaTestConfigSource.waitForSeconds())).iterator().forEachRemaining(actual::add);
            return actual.size() >= 4;
        });
        Assertions.assertThat((int)actual.size()).isGreaterThanOrEqualTo(4);
        Headers headers = ((ConsumerRecord)actual.get(0)).headers();
        Assertions.assertThat((Iterable)headers.headers("headerKey")).isNotEmpty();
        Assertions.assertThat((Iterable)headers.headers("headerKey")).allMatch(h -> h.key().equals("headerKey") && Arrays.equals(h.value(), "\"headerValue\"".getBytes(StandardCharsets.UTF_8)));
    }
}

