/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.pulsar;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.pulsar.PulsarTestConfigSource;
import io.debezium.server.pulsar.PulsarTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.UUID;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=PulsarTestResourceLifecycleManager.class)})
public class PulsarIT {
    private static final int MESSAGE_COUNT = 4;
    private static final String TOPIC_NAME = "testc.inventory.customers";
    private static final String NOKEY_TOPIC_NAME = "testc.inventory.nokey";
    @ConfigProperty(name="debezium.source.database.hostname")
    String dbHostname;
    @ConfigProperty(name="debezium.source.database.port")
    String dbPort;
    @ConfigProperty(name="debezium.source.database.user")
    String dbUser;
    @ConfigProperty(name="debezium.source.database.password")
    String dbPassword;
    @ConfigProperty(name="debezium.source.database.dbname")
    String dbName;
    protected static PulsarClient pulsarClient;

    public PulsarIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)PulsarTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException {
        Testing.Print.enable();
        pulsarClient = PulsarClient.builder().serviceUrl(PulsarTestResourceLifecycleManager.getPulsarServiceUrl()).build();
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw new RuntimeException((Throwable)event.getError().get());
        }
    }

    @Test
    public void testPulsar() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(PulsarTestConfigSource.waitForSeconds())).until(() -> pulsarClient != null);
        Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{TOPIC_NAME}).subscriptionName("test-" + UUID.randomUUID()).subscribe();
        ArrayList records = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(PulsarTestConfigSource.waitForSeconds())).until(() -> {
            Message message = consumer.receive();
            Assertions.assertTrue((boolean)message.getProperties().containsKey("headerKey"));
            records.add(message);
            return records.size() >= 4;
        });
        JdbcConfiguration config = (JdbcConfiguration)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)JdbcConfiguration.create().with("hostname", this.dbHostname)).with("port", this.dbPort)).with("user", this.dbUser)).with("password", this.dbPassword)).with("dbname", this.dbName)).build();
        try (PostgresConnection connection = new PostgresConnection(config, "Debezium Pulsar Test");){
            connection.execute(new String[]{"CREATE TABLE inventory.nokey (val INT);", "INSERT INTO inventory.nokey VALUES (1)", "INSERT INTO inventory.nokey VALUES (2)", "INSERT INTO inventory.nokey VALUES (3)", "INSERT INTO inventory.nokey VALUES (4)"});
        }
        Consumer nokeyConsumer = pulsarClient.newConsumer(Schema.STRING).topic(new String[]{NOKEY_TOPIC_NAME}).subscriptionName("test-" + UUID.randomUUID()).subscribe();
        ArrayList nokeyRecords = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(PulsarTestConfigSource.waitForSeconds())).until(() -> {
            nokeyRecords.add(nokeyConsumer.receive());
            return nokeyRecords.size() >= 4;
        });
    }
}

