package io.vertx.mqtt.test.client;

import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.ext.unit.junit.VertxUnitRunner;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import java.util.LinkedList;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(VertxUnitRunner.class)
/* loaded from: input_file:io/vertx/mqtt/test/client/MqttClientOutOfOrderAcksTest.class */
public class MqttClientOutOfOrderAcksTest {
    private static final Logger log = LoggerFactory.getLogger(MqttClientOutOfOrderAcksTest.class);
    private static final String MQTT_TOPIC = "/my_topic";
    private static final String MQTT_MESSAGE = "Hello Vert.x MQTT Client";
    Vertx vertx = Vertx.vertx();
    MqttServer server;

    @Test
    public void publishQoS1OutOfOrderAcks(TestContext testContext) throws InterruptedException {
        clientSendThreePublishMessages(MqttQoS.AT_LEAST_ONCE, testContext);
    }

    @Test
    public void publishQoS2OutOfOrderAcks(TestContext testContext) throws InterruptedException {
        clientSendThreePublishMessages(MqttQoS.EXACTLY_ONCE, testContext);
    }

    private void clientSendThreePublishMessages(MqttQoS mqttQoS, TestContext testContext) {
        Async async = testContext.async(3);
        MqttClient create = MqttClient.create(this.vertx);
        LinkedList linkedList = new LinkedList();
        linkedList.add(2);
        linkedList.add(1);
        linkedList.add(3);
        create.publishCompletionHandler(num -> {
            testContext.assertEquals(Integer.valueOf(num.intValue()), linkedList.poll());
            log.info("[CLIENT] Publish completed for message with id: " + num);
            async.countDown();
        });
        create.connect(1883, "localhost", asyncResult -> {
            for (int i = 0; i < 3; i++) {
                create.publish(MQTT_TOPIC, Buffer.buffer(MQTT_MESSAGE.getBytes()), mqttQoS, false, false, asyncResult -> {
                    log.info("[CLIENT] publishing message id = " + asyncResult.result());
                });
            }
        });
        async.await();
        create.disconnect();
    }

    @Before
    public void before(TestContext testContext) {
        this.server = MqttServer.create(this.vertx);
        this.server.exceptionHandler(th -> {
            testContext.fail();
        });
        this.server.endpointHandler(MqttClientOutOfOrderAcksTest::serverLogic).listen(testContext.asyncAssertSuccess());
    }

    @After
    public void after() {
        this.server.close();
        this.vertx.close();
    }

    private static void serverLogic(MqttEndpoint mqttEndpoint) {
        log.info("[SERVER] Client connected");
        mqttEndpoint.publishHandler(mqttPublishMessage -> {
            log.info("[SERVER] Received PUBLISH with message id = " + mqttPublishMessage.messageId());
            if (mqttPublishMessage.qosLevel().equals(MqttQoS.EXACTLY_ONCE) && mqttPublishMessage.messageId() == 3) {
                mqttEndpoint.publishReceived(3);
                mqttEndpoint.publishReceived(2);
                mqttEndpoint.publishReceived(1);
            }
            if (mqttPublishMessage.qosLevel().equals(MqttQoS.AT_LEAST_ONCE) && mqttPublishMessage.messageId() == 3) {
                mqttEndpoint.publishAcknowledge(2);
                mqttEndpoint.publishAcknowledge(1);
                mqttEndpoint.publishAcknowledge(3);
            }
        });
        mqttEndpoint.publishReleaseHandler(num -> {
            log.info("[SERVER] Receive PUBREL with message id = " + num);
            if (num.intValue() == 1) {
                mqttEndpoint.publishComplete(2);
                mqttEndpoint.publishComplete(1);
                mqttEndpoint.publishComplete(3);
            }
        });
        mqttEndpoint.disconnectHandler(r3 -> {
            log.info("[SERVER] Client disconnected");
        });
        mqttEndpoint.accept(false);
    }
}
