/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.TopicName;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.pubsub.PubSubTestConfigSource;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.awaitility.Awaitility;
import org.fest.assertions.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class)
public class PubSubIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static final String SUBSCRIPTION_NAME = "testsubs";
    protected static Subscriber subscriber;
    private static ProjectSubscriptionName subscriptionName;
    @Inject
    DebeziumServer server;
    private static final List<PubsubMessage> messages;

    public PubSubIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)PubSubTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
                subscriptionAdminClient.deleteSubscription(subscriptionName);
            }
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
        Testing.Print.enable();
        try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create();){
            TopicName topic = TopicName.ofProjectTopicName((String)ServiceOptions.getDefaultProjectId(), (String)STREAM_NAME);
            int ackDeadlineSeconds = 0;
            subscriptionAdminClient.createSubscription(subscriptionName, topic, PushConfig.newBuilder().build(), ackDeadlineSeconds);
        }
        subscriber = Subscriber.newBuilder((ProjectSubscriptionName)subscriptionName, (MessageReceiver)new TestMessageReceiver()).build();
        subscriber.startAsync().awaitRunning();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @Test
    public void testPubSub() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> messages.size() >= 4);
        Assertions.assertThat((messages.size() >= 4 ? 1 : 0) != 0);
    }

    static {
        subscriptionName = ProjectSubscriptionName.of((String)ServiceOptions.getDefaultProjectId(), (String)SUBSCRIPTION_NAME);
        messages = Collections.synchronizedList(new ArrayList());
    }

    class TestMessageReceiver
    implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            Testing.print((Object)("Message arrived: " + message));
            messages.add(message);
            consumer.ack();
        }
    }
}

