package io.vertx.amqp;

import io.vertx.core.Future;
import io.vertx.core.Promise;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/ReceiverTest.class */
public class ReceiverTest extends ArtemisTestBase {
    @Test
    public void testReception() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                });
                CompletableFuture.runAsync(() -> {
                    this.usage.produceStrings(uuid, 10, null, () -> {
                        return Integer.toString(atomicInteger.getAndIncrement());
                    });
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWithAcceptedMessages() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                    amqpMessage.accepted();
                });
                CompletableFuture.runAsync(() -> {
                    this.usage.produceStrings(uuid, 10, null, () -> {
                        return Integer.toString(atomicInteger.getAndIncrement());
                    });
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWithRejectedMessages() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                    amqpMessage.rejected();
                });
                CompletableFuture.runAsync(() -> {
                    this.usage.produceStrings(uuid, 10, null, () -> {
                        return Integer.toString(atomicInteger.getAndIncrement());
                    });
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWithModifiedMessages() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, new AmqpReceiverOptions().setAutoAcknowledgement(false), asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                    copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                    amqpMessage.modified(true, true);
                });
                CompletableFuture.runAsync(() -> {
                    this.usage.produceStrings(uuid, 10, null, () -> {
                        return Integer.toString(atomicInteger.getAndIncrement());
                    });
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWithoutConnection() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).createReceiver(uuid, asyncResult -> {
            ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                copyOnWriteArrayList.add(amqpMessage.bodyAsString());
            });
            CompletableFuture.runAsync(() -> {
                this.usage.produceStrings(uuid, 10, null, () -> {
                    return Integer.toString(atomicInteger.getAndIncrement());
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWithoutConnectionWithoutMessageHandler() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).createReceiver(uuid, asyncResult -> {
            ((AmqpReceiver) asyncResult.result()).handler(amqpMessage -> {
                copyOnWriteArrayList.add(amqpMessage.bodyAsString());
            });
            CompletableFuture.runAsync(() -> {
                this.usage.produceStrings(uuid, 10, null, () -> {
                    return Integer.toString(atomicInteger.getAndIncrement());
                });
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(copyOnWriteArrayList.size() == 10);
        });
        Assertions.assertThat(copyOnWriteArrayList).containsExactly(new String[]{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"});
    }

    @Test
    public void testReceptionWhenDemandChangesWhileHandlingMessages() {
        AtomicInteger atomicInteger = new AtomicInteger();
        String uuid = UUID.randomUUID().toString();
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        Promise promise = Promise.promise();
        Future future = promise.future();
        this.client = AmqpClient.create(new AmqpClientOptions().setHost(this.host).setPort(this.port).setUsername(this.username).setPassword(this.password)).connect(asyncResult -> {
            ((AmqpConnection) asyncResult.result()).createReceiver(uuid, asyncResult -> {
                ((AmqpReceiver) asyncResult.result()).pause().handler(amqpMessage -> {
                    copyOnWriteArrayList.add(amqpMessage.bodyAsString());
                });
                promise.complete(asyncResult.result());
            });
        });
        ConditionFactory await = Awaitility.await();
        future.getClass();
        await.until(future::succeeded);
        Promise promise2 = Promise.promise();
        CompletableFuture.runAsync(() -> {
            AmqpUsage amqpUsage = this.usage;
            promise2.getClass();
            amqpUsage.produceStrings(uuid, 1000, promise2::complete, () -> {
                return Integer.toString(atomicInteger.getAndIncrement());
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(promise2.future().succeeded());
        });
        AmqpReceiver amqpReceiver = (AmqpReceiver) future.result();
        amqpReceiver.fetch(400L);
        Promise promise3 = Promise.promise();
        CompletableFuture.runAsync(() -> {
            AmqpUsage amqpUsage = this.usage;
            promise3.getClass();
            amqpUsage.produceStrings(uuid, 1000, promise3::complete, () -> {
                return Integer.toString(atomicInteger.getAndIncrement());
            });
        });
        Awaitility.await().until(() -> {
            return Boolean.valueOf(promise3.future().succeeded());
        });
        amqpReceiver.fetch(1600L);
        Awaitility.await("All sent messages should be handled").atMost(20L, TimeUnit.SECONDS).untilAsserted(() -> {
            Assertions.assertThat(copyOnWriteArrayList).containsAll((Iterable) IntStream.range(0, 2000).mapToObj(String::valueOf).collect(Collectors.toList()));
        });
    }
}
