/*
 * Decompiled with CFR 0.152.
 */
package org.acme.travel;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.cloudevents.jackson.JsonFormat;
import java.net.URI;
import java.time.OffsetDateTime;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.acme.travel.Traveller;
import org.junit.After;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.kie.kogito.kafka.KafkaClient;
import org.kie.kogito.testcontainers.springboot.KafkaSpringBootTestResource;
import org.kie.kogito.tests.KogitoKafkaQuickstartSpringbootApplication;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;

@SpringBootTest(classes={KogitoKafkaQuickstartSpringbootApplication.class})
@ContextConfiguration(initializers={KafkaSpringBootTestResource.class})
public class MessagingIT {
    public static final String TOPIC_PRODUCER = "travellers";
    public static final String TOPIC_CONSUMER = "processedtravellers";
    private static Logger LOGGER = LoggerFactory.getLogger(MessagingIT.class);
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private KafkaClient kafkaClient;

    @Test
    public void testProcess() throws InterruptedException {
        this.objectMapper.configure(JsonParser.Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
        this.objectMapper.registerModule((Module)JsonFormat.getCloudEventJacksonModule());
        int count = 3;
        CountDownLatch countDownLatch = new CountDownLatch(3);
        this.kafkaClient.consume(TOPIC_CONSUMER, s -> {
            LOGGER.info("Received from kafka: {}", s);
            try {
                JsonNode event = (JsonNode)this.objectMapper.readValue(s, JsonNode.class);
                Traveller traveller = (Traveller)this.objectMapper.readValue(event.get("data").toString(), Traveller.class);
                Assertions.assertTrue((boolean)traveller.isProcessed());
                Assertions.assertTrue((boolean)traveller.getFirstName().matches("Name[0-9]+"));
                Assertions.assertTrue((boolean)traveller.getLastName().matches("LastName[0-9]+"));
                Assertions.assertTrue((boolean)traveller.getEmail().matches("email[0-9]+"));
                Assertions.assertTrue((boolean)traveller.getNationality().matches("Nationality[0-9]+"));
                countDownLatch.countDown();
            }
            catch (JsonProcessingException e) {
                LOGGER.error("Error parsing {}", s, (Object)e);
                Assertions.fail((Throwable)e);
            }
        });
        IntStream.range(0, 3).mapToObj(i -> new Traveller("Name" + i, "LastName" + i, "email" + i, "Nationality" + i)).forEach(traveller -> this.kafkaClient.produce(this.generateCloudEvent((Traveller)traveller), TOPIC_PRODUCER));
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assertions.assertEquals((long)0L, (long)countDownLatch.getCount());
    }

    private String generateCloudEvent(Traveller traveller) {
        Assertions.assertFalse((boolean)traveller.isProcessed());
        try {
            return this.objectMapper.writeValueAsString((Object)((io.cloudevents.core.v1.CloudEventBuilder)CloudEventBuilder.v1().withId(UUID.randomUUID().toString()).withSource(URI.create("")).withType("TravelersMessageDataEvent_3").withTime(OffsetDateTime.now()).withData(this.objectMapper.writeValueAsString((Object)traveller).getBytes())).build());
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @After
    public void stop() {
        Optional.ofNullable(this.kafkaClient).ifPresent(KafkaClient::shutdown);
    }
}

