/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.events.ConnectorStartedEvent;
import io.debezium.server.rabbitmq.RabbitMqTestConfigSource;
import io.debezium.server.rabbitmq.RabbitMqTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=RabbitMqTestResourceLifecycleManager.class)})
public class RabbitMqIT {
    private static final int MESSAGE_COUNT = 4;
    private static Connection connection;
    private static Channel channel;
    @ConfigProperty(name="debezium.source.database.hostname")
    String dbHostname;
    @ConfigProperty(name="debezium.source.database.port")
    String dbPort;
    @ConfigProperty(name="debezium.source.database.user")
    String dbUser;
    @ConfigProperty(name="debezium.source.database.password")
    String dbPassword;
    @ConfigProperty(name="debezium.source.database.dbname")
    String dbName;
    private static final List<String> messages;

    public RabbitMqIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)RabbitMqTestConfigSource.OFFSET_STORE_PATH);
    }

    void setupDependencies(@Observes ConnectorStartedEvent event) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(RabbitMqTestResourceLifecycleManager.container.getHost());
        factory.setPort(RabbitMqTestResourceLifecycleManager.getPort());
        connection = factory.newConnection();
        channel = connection.createChannel();
        channel.exchangeDeclare("testc.inventory.customers", BuiltinExchangeType.DIRECT);
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue, "testc.inventory.customers", "");
        channel.basicConsume(queue, (Consumer)new DefaultConsumer(channel){

            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                String message = new String(body, StandardCharsets.UTF_8);
                messages.add(message);
            }
        });
    }

    @AfterAll
    static void stop() throws IOException, TimeoutException {
        if (channel != null) {
            channel.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw new RuntimeException((Throwable)event.getError().get());
        }
    }

    @Test
    public void testRabbitMq() throws Exception {
        Awaitility.await().atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())).until(() -> messages.size() >= 4);
        Assertions.assertThat((int)messages.size()).isGreaterThanOrEqualTo(4);
        messages.clear();
        JdbcConfiguration config = (JdbcConfiguration)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)((JdbcConfiguration.Builder)JdbcConfiguration.create().with("hostname", this.dbHostname)).with("port", this.dbPort)).with("user", this.dbUser)).with("password", this.dbPassword)).with("dbname", this.dbName)).build();
        try (PostgresConnection connection = new PostgresConnection(config, "Debezium Pulsar Test");){
            connection.execute(new String[]{"INSERT INTO inventory.customers VALUES (10000, 'John', 'Doe', 'jdoe@example.org')", "DELETE FROM inventory.customers WHERE id=10000"});
        }
        Awaitility.await().atMost(Duration.ofSeconds(RabbitMqTestConfigSource.waitForSeconds())).until(() -> messages.size() >= 3);
        Assertions.assertThat((int)messages.size()).isGreaterThanOrEqualTo(3);
        Assertions.assertThat((String)messages.get(2)).isEqualTo((Object)"default");
    }

    static {
        channel = null;
        messages = Collections.synchronizedList(new ArrayList());
    }
}

