package io.vertx.mqtt.it;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import java.nio.charset.Charset;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/mqtt/it/MqttClientSubscribeIT.class */
public class MqttClientSubscribeIT {
    private static final Logger log = LoggerFactory.getLogger(MqttClientSubscribeIT.class);
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client";
    private int messageId = 0;

    @Test
    public void subscribeQos2AndReceive(TestContext testContext) throws InterruptedException {
        subscribeAndReceive(testContext, MqttQoS.EXACTLY_ONCE);
    }

    @Test
    public void subscribeQos1AndReceive(TestContext testContext) throws InterruptedException {
        subscribeAndReceive(testContext, MqttQoS.AT_LEAST_ONCE);
    }

    @Test
    public void subscribeQoS0AndReceive(TestContext testContext) throws InterruptedException {
        subscribeAndReceive(testContext, MqttQoS.AT_MOST_ONCE);
    }

    @Test
    public void subscribeQoS0(TestContext testContext) throws InterruptedException {
        subscribe(testContext, MqttQoS.AT_MOST_ONCE);
    }

    @Test
    public void subscribeQoS1(TestContext testContext) throws InterruptedException {
        subscribe(testContext, MqttQoS.AT_LEAST_ONCE);
    }

    @Test
    public void subscribeQoS2(TestContext testContext) throws InterruptedException {
        subscribe(testContext, MqttQoS.EXACTLY_ONCE);
    }

    @Test
    public void unsubscribedNoMessageReceived(TestContext testContext) throws InterruptedException {
        Async async = testContext.async(2);
        Async async2 = testContext.async();
        MqttClient create = MqttClient.create(Vertx.vertx());
        MqttClient create2 = MqttClient.create(Vertx.vertx());
        MqttClient create3 = MqttClient.create(Vertx.vertx());
        create.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, asyncResult -> {
            Assert.assertTrue(asyncResult.succeeded());
            create.publishHandler(mqttPublishMessage -> {
                log.error("Subscriber " + create.clientId() + " received message " + new String(mqttPublishMessage.payload().getBytes()));
                testContext.fail();
            });
            create.subscribe(MQTT_TOPIC, MqttQoS.AT_MOST_ONCE.value(), asyncResult -> {
                Assert.assertTrue(asyncResult.succeeded());
                log.info("Subscriber " + create.clientId() + " subscribed to " + MQTT_TOPIC);
                create.unsubscribe(MQTT_TOPIC, asyncResult -> {
                    Assert.assertTrue(asyncResult.succeeded());
                    log.info("Subscriber " + create.clientId() + " un-subscribed from " + MQTT_TOPIC);
                    async.countDown();
                });
            });
        });
        create2.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, asyncResult2 -> {
            Assert.assertTrue(asyncResult2.succeeded());
            create2.publishHandler(mqttPublishMessage -> {
                log.error("Subscriber " + create2.clientId() + " received message " + new String(mqttPublishMessage.payload().getBytes()));
                async2.complete();
            });
            create2.subscribe(MQTT_TOPIC, MqttQoS.AT_MOST_ONCE.value(), asyncResult2 -> {
                Assert.assertTrue(asyncResult2.succeeded());
                log.info("Subscriber " + create2.clientId() + " subscribed to " + MQTT_TOPIC);
                async.countDown();
            });
        });
        async.await();
        create3.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, asyncResult3 -> {
            create3.publish(MQTT_TOPIC, Buffer.buffer(MQTT_MESSAGE.getBytes()), MqttQoS.AT_MOST_ONCE, false, false, asyncResult3 -> {
                Assert.assertTrue(asyncResult3.succeeded());
                this.messageId = ((Integer) asyncResult3.result()).intValue();
                log.info("Publishing message id = " + this.messageId);
            });
        });
        async2.await();
    }

    private void subscribeAndReceive(TestContext testContext, MqttQoS mqttQoS) {
        Async async = testContext.async();
        MqttClient create = MqttClient.create(Vertx.vertx());
        create.publishHandler(mqttPublishMessage -> {
            Assert.assertTrue(mqttPublishMessage.qosLevel() == mqttQoS);
            log.info("Just received message on [" + mqttPublishMessage.topicName() + "] payload [" + mqttPublishMessage.payload().toString(Charset.defaultCharset()) + "] with QoS [" + mqttPublishMessage.qosLevel() + "]");
            create.disconnect();
            async.countDown();
        });
        create.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, asyncResult -> {
            Assert.assertTrue(asyncResult.succeeded());
            create.subscribe(MQTT_TOPIC, mqttQoS.value());
            create.publish(MQTT_TOPIC, Buffer.buffer(MQTT_MESSAGE.getBytes()), mqttQoS, false, false);
        });
        async.await();
    }

    private void subscribe(TestContext testContext, MqttQoS mqttQoS) {
        this.messageId = 0;
        Async async = testContext.async();
        MqttClient create = MqttClient.create(Vertx.vertx());
        create.subscribeCompletionHandler(mqttSubAckMessage -> {
            Assert.assertTrue(mqttSubAckMessage.messageId() == this.messageId);
            Assert.assertTrue(mqttSubAckMessage.grantedQoSLevels().contains(Integer.valueOf(mqttQoS.value())));
            log.info("subscribing complete for message id = " + mqttSubAckMessage.messageId() + " with QoS " + mqttSubAckMessage.grantedQoSLevels());
            create.disconnect();
            async.countDown();
        });
        create.connect(TestUtil.BROKER_PORT, TestUtil.BROKER_ADDRESS, asyncResult -> {
            Assert.assertTrue(asyncResult.succeeded());
            create.subscribe(MQTT_TOPIC, mqttQoS.value(), asyncResult -> {
                Assert.assertTrue(asyncResult.succeeded());
                this.messageId = ((Integer) asyncResult.result()).intValue();
                log.info("subscribing on [/my_topic] with QoS [" + mqttQoS.value() + "] message id = " + this.messageId);
            });
        });
        async.await();
    }
}
