/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.nats.jetstream;

import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.nats.jetstream.NatsJetStreamTestConfigSource;
import io.debezium.server.nats.jetstream.NatsJetStreamTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.JetStream;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.enterprise.event.Observes;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=NatsJetStreamTestResourceLifecycleManager.class)})
class NatsJetStreamIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String SUBJECT_NAME = "testc.inventory.customers";
    protected static Connection nc;
    protected static JetStream js;
    protected static Dispatcher d;
    private static final List<Message> messages;

    NatsJetStreamIT() {
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) {
        Testing.Print.enable();
        try {
            nc = Nats.connect((String)NatsJetStreamTestResourceLifecycleManager.getNatsContainerUrl());
            js = nc.jetStream();
        }
        catch (Exception e) {
            Testing.print((Object)"Could not connect to NATS Jetstream");
        }
        try {
            d = nc.createDispatcher();
            js.subscribe(SUBJECT_NAME, d, messages::add, true);
        }
        catch (Exception e) {
            Testing.print((Object)("Could not register message handler: " + e.getMessage()));
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @AfterAll
    static void stop() throws Exception {
        if (d != null) {
            d.unsubscribe(SUBJECT_NAME);
        }
    }

    @Test
    void testNatsStreaming() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(NatsJetStreamTestConfigSource.waitForSeconds())).until(() -> messages.size() >= 4);
        Assertions.assertThat((int)messages.size()).isGreaterThanOrEqualTo(4);
    }

    static {
        Testing.Files.delete((Path)NatsJetStreamTestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)NatsJetStreamTestConfigSource.OFFSET_STORE_PATH);
        messages = Collections.synchronizedList(new ArrayList());
    }
}

