/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pubsub;

import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectId;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.pubsub.v1.PubsubMessage;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.pubsub.PubSubLiteTestConfigSource;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import javax.enterprise.event.Observes;
import javax.inject.Inject;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;

@QuarkusTest
@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class)
@EnabledIfSystemProperty(named="debezium.sink.type", matches="pubsublite")
public class PubSubLiteIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static final String SUBSCRIPTION_NAME = "testsubs";
    private static final String cloudRegion = "us-central1";
    private static final char zoneId = 'b';
    protected static Subscriber subscriber;
    private static final String projectId;
    private static final SubscriptionPath subscriptionPath;
    @Inject
    DebeziumServer server;
    private static final List<PubsubMessage> messages;

    public PubSubLiteIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)PubSubLiteTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            AdminClientSettings adminClientSettings = AdminClientSettings.newBuilder().setRegion(CloudRegion.of((String)cloudRegion)).build();
            try (AdminClient adminClient = AdminClient.create((AdminClientSettings)adminClientSettings);){
                adminClient.deleteSubscription(subscriptionPath).get();
            }
            catch (InterruptedException | ExecutionException e) {
                Testing.printError((Throwable)e);
            }
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
        Testing.Print.enable();
        TopicPath topicPath = ((TopicPath.Builder)((TopicPath.Builder)TopicPath.newBuilder().setProject(ProjectId.of((String)projectId))).setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)cloudRegion), (char)'b'))).setName(TopicName.of((String)STREAM_NAME)).build();
        Subscription subscription = Subscription.newBuilder().setDeliveryConfig(Subscription.DeliveryConfig.newBuilder().setDeliveryRequirement(Subscription.DeliveryConfig.DeliveryRequirement.DELIVER_IMMEDIATELY)).setName(subscriptionPath.toString()).setTopic(topicPath.toString()).build();
        AdminClientSettings adminClientSettings = AdminClientSettings.newBuilder().setRegion(CloudRegion.of((String)cloudRegion)).build();
        FlowControlSettings flowControlSettings = FlowControlSettings.builder().setBytesOutstanding(0xA00000L).setMessagesOutstanding(1000L).build();
        SubscriberSettings subscriberSettings = SubscriberSettings.newBuilder().setSubscriptionPath(subscriptionPath).setReceiver((MessageReceiver)new TestMessageReceiver()).setPerPartitionFlowControlSettings(flowControlSettings).build();
        try (AdminClient adminClient = AdminClient.create((AdminClientSettings)adminClientSettings);){
            adminClient.createSubscription(subscription).get();
        }
        catch (InterruptedException | ExecutionException e) {
            Testing.printError((Throwable)e);
        }
        Subscriber subscriber = Subscriber.create((SubscriberSettings)subscriberSettings);
        subscriber.startAsync().awaitRunning();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @Test
    public void testPubSubLite() {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> messages.size() >= 4);
        Assertions.assertThat((int)messages.size()).isGreaterThanOrEqualTo(4);
    }

    static {
        projectId = ServiceOptions.getDefaultProjectId();
        subscriptionPath = ((SubscriptionPath.Builder)((SubscriptionPath.Builder)SubscriptionPath.newBuilder().setLocation(CloudZone.of((CloudRegion)CloudRegion.of((String)cloudRegion), (char)'b'))).setProject(ProjectId.of((String)projectId))).setName(SubscriptionName.of((String)SUBSCRIPTION_NAME)).build();
        messages = Collections.synchronizedList(new ArrayList());
    }

    class TestMessageReceiver
    implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            Testing.print((Object)("Message arrived: " + message));
            messages.add(message);
            consumer.ack();
        }
    }
}

