package io.vertx.amqp;

import io.vertx.core.Handler;
import io.vertx.ext.unit.Async;
import io.vertx.ext.unit.TestContext;
import io.vertx.proton.ProtonHelper;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.Target;
import org.junit.Test;

/* loaded from: input_file:io/vertx/amqp/SenderTest.class */
public class SenderTest extends BareTestBase {
    @Test(timeout = 20000)
    public void testProducerClose(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testProducerEnd(TestContext testContext) throws Exception {
        doProducerCloseTestImpl(testContext, true);
    }

    private void doProducerCloseTestImpl(TestContext testContext, boolean z) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    async.complete();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                    async2.complete();
                });
                protonReceiver.open();
            });
        });
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.exceptionHandler(th -> {
                    atomicBoolean.set(true);
                });
                amqpSender.sendWithAck(AmqpMessage.create().withBody(str).build(), asyncResult -> {
                    testContext.assertTrue(asyncResult.succeeded());
                    if (z) {
                        amqpSender.end();
                    } else {
                        amqpSender.close((Handler) null);
                    }
                    create.close(asyncResult -> {
                        testContext.assertTrue(asyncResult.succeeded());
                        async3.complete();
                    });
                });
            });
        });
        try {
            async.awaitSuccess();
            async2.awaitSuccess();
            async3.awaitSuccess();
            testContext.assertFalse(atomicBoolean.get(), "exception handler unexpectedly called");
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSenderFlowControlMechanisms(TestContext testContext) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        Async async3 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.setAutoAccept(false);
                protonReceiver.setPrefetch(0);
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    this.vertx.setTimer(250L, l -> {
                        async2.awaitSuccess();
                        protonReceiver.flow(1);
                    });
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
                this.vertx.setTimer(250L, l -> {
                    async.awaitSuccess();
                    protonReceiver.flow(1);
                });
            });
        });
        AmqpClient create = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        create.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                testContext.assertTrue(amqpSender.writeQueueFull(), "expected write queue to be full, we have not yet granted credit");
                amqpSender.drainHandler(r14 -> {
                    testContext.assertTrue(async.isSucceeded(), "should have been called after initial credit delay");
                    testContext.assertFalse(amqpSender.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                    amqpSender.send(AmqpMessage.create().withBody(str).build());
                    testContext.assertTrue(amqpSender.writeQueueFull(), "expected write queue to be full, we just used all the credit");
                    amqpSender.drainHandler(r9 -> {
                        testContext.assertTrue(async2.isSucceeded(), "should have been called after 2nd credit delay");
                        testContext.assertFalse(amqpSender.writeQueueFull(), "expected write queue not to be full, we just granted credit");
                        create.close(asyncResult -> {
                            testContext.assertTrue(asyncResult.succeeded());
                            async3.complete();
                        });
                    });
                    async2.complete();
                });
            });
            async.complete();
        });
        try {
            async3.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, false);
    }

    @Test(timeout = 20000)
    public void testSenderClosedRemotelyWithErrorCallsExceptionHandler(TestContext testContext) throws Exception {
        doSenderClosedRemotelyCallsExceptionHandlerTestImpl(testContext, true);
    }

    private void doSenderClosedRemotelyCallsExceptionHandlerTestImpl(TestContext testContext, boolean z) throws Exception {
        String methodName = this.name.getMethodName();
        String str = "myMessageContent-" + methodName;
        Async async = testContext.async();
        Async async2 = testContext.async();
        MockServer mockServer = new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.closeHandler(asyncResult -> {
                    protonConnection.close();
                });
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertNotNull(remoteTarget, "target should not be null");
                testContext.assertEquals(methodName, remoteTarget.getAddress(), "expected given address");
                protonReceiver.setTarget(remoteTarget.copy());
                protonReceiver.handler((protonDelivery, message) -> {
                    AmqpValue body = message.getBody();
                    testContext.assertNotNull(body, "received body was null");
                    testContext.assertTrue(body instanceof AmqpValue, "unexpected body section type: " + body.getClass());
                    testContext.assertEquals(str, body.getValue(), "Unexpected message body content");
                    if (z) {
                        protonReceiver.setCondition(ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "testing-error"));
                    }
                    protonReceiver.close();
                });
                protonReceiver.closeHandler(asyncResult2 -> {
                    protonReceiver.close();
                });
                protonReceiver.open();
            });
        });
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(mockServer.actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender(methodName, asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                AmqpSender amqpSender = (AmqpSender) asyncResult.result();
                amqpSender.exceptionHandler(th -> {
                    testContext.assertNotNull(th, "expected exception");
                    testContext.assertTrue(th instanceof Exception, "expected vertx exception");
                    if (z) {
                        testContext.assertNotNull(th.getCause(), "expected cause");
                    } else {
                        testContext.assertNull(th.getCause(), "expected no cause");
                    }
                    async2.complete();
                    this.client.close(asyncResult -> {
                        if (asyncResult.failed()) {
                            asyncResult.cause().printStackTrace();
                        }
                        testContext.assertTrue(asyncResult.succeeded());
                        async.complete();
                    });
                });
                amqpSender.send(AmqpMessage.create().withBody(str).build());
            });
        });
        try {
            async2.awaitSuccess();
            async.awaitSuccess();
            mockServer.close();
        } catch (Throwable th) {
            mockServer.close();
            throw th;
        }
    }

    @Test(timeout = 20000)
    public void testDynamicSenderWithOptions(TestContext testContext) throws ExecutionException, InterruptedException {
        String uuid = UUID.randomUUID().toString();
        String str = "myMessageContent-" + uuid;
        Async async = testContext.async();
        this.client = AmqpClient.create(this.vertx, new AmqpClientOptions().setHost("localhost").setPort(new MockServer(this.vertx, protonConnection -> {
            protonConnection.openHandler(asyncResult -> {
                protonConnection.open();
            });
            protonConnection.sessionOpenHandler((v0) -> {
                v0.open();
            });
            protonConnection.closeHandler(asyncResult2 -> {
                protonConnection.close();
            });
            protonConnection.receiverOpenHandler(protonReceiver -> {
                protonReceiver.closeHandler(asyncResult3 -> {
                    protonReceiver.close();
                });
                testContext.assertNotNull(protonReceiver.getRemoteTarget(), "source should not be null");
                org.apache.qpid.proton.amqp.messaging.Target remoteTarget = protonReceiver.getRemoteTarget();
                testContext.assertTrue(remoteTarget.getDynamic(), "expected dynamic source to be requested");
                testContext.assertNull(remoteTarget.getAddress(), "expected no source address to be set");
                org.apache.qpid.proton.amqp.messaging.Target copy = remoteTarget.copy();
                copy.setAddress(uuid);
                protonReceiver.setTarget(copy);
                protonReceiver.open();
                async.complete();
            });
        }).actualPort()));
        this.client.connect(asyncResult -> {
            testContext.assertTrue(asyncResult.succeeded());
            ((AmqpConnection) asyncResult.result()).createSender((String) null, new AmqpSenderOptions().setDynamic(true), asyncResult -> {
                testContext.assertTrue(asyncResult.succeeded());
                testContext.assertNotNull(((AmqpSender) asyncResult.result()).address());
                ((AmqpSender) asyncResult.result()).send(AmqpMessage.create().withBody(str).build());
            });
        });
        async.awaitSuccess();
    }
}
