/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.server.rocketmq;

import io.debezium.server.TestConfigSource;
import io.debezium.server.events.ConnectorCompletedEvent;
import io.debezium.server.rocketmq.RocketMqTestConfigSource;
import io.debezium.server.rocketmq.RocketMqTestResourceLifecycleManager;
import io.debezium.testing.testcontainers.PostgresTestResourceLifecycleManager;
import io.debezium.util.Testing;
import io.quarkus.test.common.QuarkusTestResource;
import io.quarkus.test.junit.QuarkusTest;
import jakarta.enterprise.event.Observes;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;

@QuarkusTest
@QuarkusTestResource.List(value={@QuarkusTestResource(value=PostgresTestResourceLifecycleManager.class), @QuarkusTestResource(value=RocketMqTestResourceLifecycleManager.class)})
public class RocketMqIT {
    private static final int MESSAGE_COUNT = 4;
    private static DefaultLitePullConsumer consumer = null;

    public RocketMqIT() {
        Testing.Files.delete((Path)TestConfigSource.OFFSET_STORE_PATH);
        Testing.Files.createTestingFile((Path)RocketMqTestConfigSource.OFFSET_STORE_PATH);
    }

    @AfterAll
    static void stop() {
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    void connectorCompleted(@Observes ConnectorCompletedEvent event) throws Exception {
        if (!event.isSuccess()) {
            throw new RuntimeException((Throwable)event.getError().get());
        }
    }

    @Test
    public void testRocketMQ() throws Exception {
        consumer = new DefaultLitePullConsumer("consumer-group");
        consumer.setNamesrvAddr(RocketMqTestResourceLifecycleManager.getNameSrvAddr());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.subscribe("testc-inventory-customers", "*");
        consumer.start();
        ArrayList records = new ArrayList();
        Awaitility.await().atMost(Duration.ofSeconds(RocketMqTestConfigSource.waitForSeconds())).until(() -> {
            records.addAll(consumer.poll(5000L));
            for (MessageExt record : records) {
                Assertions.assertThat((String)record.getUserProperty("headerKey")).isNotNull();
            }
            return records.size() >= 4;
        });
        Assertions.assertThat((int)records.size()).isGreaterThanOrEqualTo(4);
    }
}

