/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pubsub;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.ServiceOptions;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.DebeziumServer;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.pubsub.PubSubTestConfigSource;
import io.debezium.server.pubsub.PubSubTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=PubSubTestResourceLifecycleManager.class)})
public class PubSubIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String STREAM_NAME = "testc.inventory.customers";
    private static final String SUBSCRIPTION_NAME = "testsubs";
    protected static Subscriber subscriber;
    private static ProjectSubscriptionName subscriptionName;
    private static TopicName topicName;
    private static ManagedChannel channel;
    private static TransportChannelProvider channelProvider;
    private static CredentialsProvider credentialsProvider;
    @Inject
    DebeziumServer server;
    @ConfigProperty(name="debezium.source.database.port")
    String postgresPort;
    private static final List<PubsubMessage> messages;

    public PubSubIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)PubSubTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() throws IOException {
        if (subscriber != null) {
            subscriber.stopAsync();
            subscriber.awaitTerminated();
            try (SubscriptionAdminClient subscriptionAdminClient = PubSubIT.createSubscriptionAdminClient();){
                subscriptionAdminClient.deleteSubscription(subscriptionName);
            }
            try (TopicAdminClient topicAdminClient = PubSubIT.createTopicAdminClient();){
                topicAdminClient.deleteTopic(topicName);
            }
        }
        if (channel != null && !channel.isShutdown()) {
            channel.shutdown();
        }
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
        TopicAdminClient topicAdminClient2;
        SubscriptionAdminClient subscriptionAdminClient2;
        Testing.Print.enable();
        this.createChannel();
        try {
            subscriptionAdminClient2 = PubSubIT.createSubscriptionAdminClient();
            try {
                subscriptionAdminClient2.deleteSubscription(subscriptionName);
            }
            finally {
                if (subscriptionAdminClient2 != null) {
                    subscriptionAdminClient2.close();
                }
            }
        }
        catch (NotFoundException subscriptionAdminClient2) {
            // empty catch block
        }
        try {
            topicAdminClient2 = PubSubIT.createTopicAdminClient();
            try {
                topicAdminClient2.deleteTopic(topicName);
            }
            finally {
                if (topicAdminClient2 != null) {
                    topicAdminClient2.close();
                }
            }
        }
        catch (NotFoundException topicAdminClient2) {
            // empty catch block
        }
        topicAdminClient2 = PubSubIT.createTopicAdminClient();
        try {
            Topic topic = topicAdminClient2.createTopic(topicName);
            Testing.print((Object)("Created topic: " + topic.getName()));
        }
        finally {
            if (topicAdminClient2 != null) {
                topicAdminClient2.close();
            }
        }
        subscriptionAdminClient2 = PubSubIT.createSubscriptionAdminClient();
        try {
            int ackDeadlineSeconds = 0;
            subscriptionAdminClient2.createSubscription(subscriptionName, topicName, PushConfig.newBuilder().build(), ackDeadlineSeconds);
        }
        finally {
            if (subscriptionAdminClient2 != null) {
                subscriptionAdminClient2.close();
            }
        }
        subscriber = this.createSubscriber();
        subscriber.startAsync().awaitRunning();
    }

    void createChannel() {
        channel = ManagedChannelBuilder.forTarget((String)PubSubTestResourceLifecycleManager.getEmulatorEndpoint()).usePlaintext().build();
        channelProvider = FixedTransportChannelProvider.create((TransportChannel)GrpcTransportChannel.create((ManagedChannel)channel));
        credentialsProvider = NoCredentialsProvider.create();
        Testing.print((Object)("Executing test towards pubsub emulator running at: " + PubSubTestResourceLifecycleManager.getEmulatorEndpoint()));
    }

    Subscriber createSubscriber() {
        return Subscriber.newBuilder((ProjectSubscriptionName)subscriptionName, (MessageReceiver)new TestMessageReceiver()).setChannelProvider(channelProvider).setCredentialsProvider(credentialsProvider).build();
    }

    static SubscriptionAdminClient createSubscriptionAdminClient() throws IOException {
        return SubscriptionAdminClient.create((SubscriptionAdminSettings)((SubscriptionAdminSettings.Builder)((SubscriptionAdminSettings.Builder)SubscriptionAdminSettings.newBuilder().setTransportChannelProvider(channelProvider)).setCredentialsProvider(credentialsProvider)).build());
    }

    static TopicAdminClient createTopicAdminClient() throws IOException {
        return TopicAdminClient.create((TopicAdminSettings)((TopicAdminSettings.Builder)((TopicAdminSettings.Builder)TopicAdminSettings.newBuilder().setTransportChannelProvider(channelProvider)).setCredentialsProvider(credentialsProvider)).build());
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw (Exception)event.getError().get();
        }
    }

    @Test
    public void testPubSub() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> messages.size() >= 4);
        Assertions.assertThat((int)messages.size()).isGreaterThanOrEqualTo(4);
        messages.clear();
        try (PostgresConnection conn = new PostgresConnection(this.defaultJdbcConfig(), "debezium-server-test");){
            conn.execute(new String[]{"INSERT INTO inventory.customers VALUES (10000, 'Test', 'PubSub', 'testpubsub@example.org')", "DELETE FROM inventory.customers WHERE id = 10000"});
        }
        Awaitility.await().atMost(Duration.ofSeconds(TestConfigSource.waitForSeconds())).until(() -> messages.size() >= 2);
        Assertions.assertThat((int)messages.size()).isEqualTo(2);
    }

    private JdbcConfiguration defaultJdbcConfig() {
        return (JdbcConfiguration)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)JdbcConfiguration.copy((Configuration)Configuration.fromSystemProperties((String)"database.")).with(CommonConnectorConfig.TOPIC_PREFIX, "dbserver1")).withDefault(JdbcConfiguration.DATABASE, "postgres")).withDefault(JdbcConfiguration.HOSTNAME, "localhost")).withDefault(JdbcConfiguration.PORT, Integer.parseInt(this.postgresPort))).withDefault(JdbcConfiguration.USER, "postgres")).withDefault(JdbcConfiguration.PASSWORD, "postgres")).build();
    }

    static {
        subscriptionName = ProjectSubscriptionName.of((String)ServiceOptions.getDefaultProjectId(), (String)SUBSCRIPTION_NAME);
        topicName = TopicName.of((String)ServiceOptions.getDefaultProjectId(), (String)STREAM_NAME);
        messages = Collections.synchronizedList(new ArrayList());
    }

    class TestMessageReceiver
    implements MessageReceiver {
        TestMessageReceiver() {
        }

        public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
            Testing.print((Object)("Message arrived: " + message));
            org.junit.jupiter.api.Assertions.assertTrue((boolean)message.getAttributesMap().containsKey("headerKey"));
            messages.add(message);
            consumer.ack();
        }
    }
}

