/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.eventhubs;

import com.azure.core.util.IterableStream;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubConsumerClient;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import io.debezium.server.DebeziumServer;
import io.debezium.server.eventhubs.EventHubsTestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@QuarkusTest
@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class)
public class EventHubsIT {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsIT.class);
    private static final int MESSAGE_COUNT = 4;
    private static final String CONSUMER_GROUP = "$Default";
    protected static EventHubProducerClient producer = null;
    protected static EventHubConsumerClient consumer = null;
    @Inject
    DebeziumServer server;

    public EventHubsIT() {
        Testing.Files.delete((Path)EventHubsTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)EventHubsTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() {
        if (producer != null) {
            producer.close();
        }
        if (consumer != null) {
            consumer.close();
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) {
        String finalConnectionString = String.format("%s;EntityPath=%s", EventHubsTestConfigSource.getEventHubsConnectionString(), EventHubsTestConfigSource.getEventHubsName());
        producer = new EventHubClientBuilder().connectionString(finalConnectionString).buildProducerClient();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @Test
    public void testEventHubs() throws Exception {
        Testing.Print.enable();
        String finalConnectionString = String.format("%s;EntityPath=%s", EventHubsTestConfigSource.getEventHubsConnectionString(), EventHubsTestConfigSource.getEventHubsName());
        consumer = new EventHubClientBuilder().connectionString(finalConnectionString).consumerGroup(CONSUMER_GROUP).buildConsumerClient();
        ArrayList expected = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(EventHubsTestConfigSource.waitForSeconds())).until(() -> {
            IterableStream events = consumer.receiveFromPartition("0", 4, EventPosition.latest());
            events.forEach(event -> expected.add(event));
            return expected.size() >= 4;
        });
        String eventBody = null;
        String expectedID = null;
        String idPart = "\"id\":100";
        for (int i = 0; i < 4; ++i) {
            eventBody = ((PartitionEvent)expected.get(i)).getData().getBodyAsString();
            expectedID = "\"id\":100" + String.valueOf(i + 1);
            Assertions.assertTrue((boolean)eventBody.contains(expectedID), (String)(expectedID + " not found in payload"));
        }
    }
}

