package org.eclipse.hono.service.amqp;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.EventBus;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonMessageHandler;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import io.vertx.proton.ProtonSession;
import java.util.UUID;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.assertj.core.api.Assertions;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EventBusMessage;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/eclipse/hono/service/amqp/RequestResponseEndpointTest.class */
public class RequestResponseEndpointTest {
    private static final String EVENT_BUS_ADDRESS = "requests";
    private static final ResourceIdentifier resource = ResourceIdentifier.from("endpoint", "DEFAULT_TENANT", (String) null);
    private static final ResourceIdentifier REPLY_RESOURCE = ResourceIdentifier.from("endpoint", "DEFAULT_TENANT", "reply-to");
    private ProtonConnection connection;
    private Vertx vertx;
    private EventBus eventBus;
    private ProtonReceiver receiver;
    private ProtonSender sender;

    @BeforeEach
    public void setUp() {
        this.connection = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        this.vertx = (Vertx) Mockito.mock(Vertx.class);
        this.eventBus = (EventBus) Mockito.mock(EventBus.class);
        this.receiver = (ProtonReceiver) Mockito.mock(ProtonReceiver.class);
        Mockito.when(this.receiver.handler((ProtonMessageHandler) ArgumentMatchers.any())).thenReturn(this.receiver);
        Mockito.when(this.receiver.closeHandler((Handler) ArgumentMatchers.any())).thenReturn(this.receiver);
        Mockito.when(this.receiver.setAutoAccept(((Boolean) ArgumentMatchers.any(Boolean.class)).booleanValue())).thenReturn(this.receiver);
        Mockito.when(this.receiver.setPrefetch(((Integer) ArgumentMatchers.any(Integer.class)).intValue())).thenReturn(this.receiver);
        Mockito.when(this.receiver.setQoS((ProtonQoS) ArgumentMatchers.any(ProtonQoS.class))).thenReturn(this.receiver);
        Mockito.when(this.vertx.eventBus()).thenReturn(this.eventBus);
        ProtonSession protonSession = (ProtonSession) Mockito.mock(ProtonSession.class);
        Mockito.when(protonSession.getConnection()).thenReturn(this.connection);
        this.sender = (ProtonSender) Mockito.mock(ProtonSender.class);
        Mockito.when(this.sender.getName()).thenReturn("mocked sender");
        Mockito.when(Boolean.valueOf(this.sender.isOpen())).thenReturn(Boolean.TRUE);
        Mockito.when(this.sender.getSession()).thenReturn(protonSession);
    }

    @Test
    public void testOnLinkAttachClosesReceiverUsingAtMostOnceQoS() {
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        Mockito.when(this.receiver.getRemoteQoS()).thenReturn(ProtonQoS.AT_MOST_ONCE);
        endpoint.onLinkAttach(this.connection, this.receiver, resource);
        ((ProtonReceiver) Mockito.verify(this.receiver)).close();
    }

    @Test
    public void testOnLinkAttachOpensReceiver() {
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        Mockito.when(this.receiver.getRemoteQoS()).thenReturn(ProtonQoS.AT_LEAST_ONCE);
        endpoint.onLinkAttach(this.connection, this.receiver, resource);
        ((ProtonReceiver) Mockito.verify(this.receiver)).handler((ProtonMessageHandler) ArgumentMatchers.any(ProtonMessageHandler.class));
        ((ProtonReceiver) Mockito.verify(this.receiver)).open();
        ((ProtonReceiver) Mockito.verify(this.receiver, Mockito.never())).close();
    }

    @Test
    public void testOnLinkAttachClosesSenderWithoutAppropriateReplyAddress() {
        getEndpoint(true).onLinkAttach(this.connection, this.sender, resource);
        ((ProtonSender) Mockito.verify(this.sender)).setCondition((ErrorCondition) ArgumentMatchers.any());
        ((ProtonSender) Mockito.verify(this.sender)).close();
    }

    @Test
    public void testHandleMessageRejectsMalformedMessage() {
        Message message = ProtonHelper.message();
        ProtonConnection protonConnection = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        ProtonDelivery protonDelivery = (ProtonDelivery) Mockito.mock(ProtonDelivery.class);
        getEndpoint(false).handleRequestMessage(protonConnection, this.receiver, resource, protonDelivery, message);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(DeliveryState.class);
        ((ProtonDelivery) Mockito.verify(protonDelivery)).disposition((DeliveryState) forClass.capture(), ((Boolean) ArgumentMatchers.eq(Boolean.TRUE)).booleanValue());
        Assertions.assertThat((DeliveryState) forClass.getValue()).isInstanceOf(Rejected.class);
        ((ProtonReceiver) Mockito.verify(this.receiver, Mockito.never())).close();
        ((ProtonReceiver) Mockito.verify(this.receiver)).flow(1);
    }

    @Test
    public void testHandleMessageSendsResponseForUnauthorizedRequests() {
        Message message = ProtonHelper.message();
        message.setSubject("unauthorized");
        message.setReplyTo(REPLY_RESOURCE.toString());
        message.setCorrelationId(UUID.randomUUID().toString());
        ProtonDelivery protonDelivery = (ProtonDelivery) Mockito.mock(ProtonDelivery.class);
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        Mockito.when(authorizationService.isAuthorized((HonoUser) ArgumentMatchers.any(HonoUser.class), (ResourceIdentifier) ArgumentMatchers.any(ResourceIdentifier.class), ArgumentMatchers.anyString())).thenReturn(Future.succeededFuture(Boolean.FALSE));
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.setAuthorizationService(authorizationService);
        endpoint.onLinkAttach(this.connection, this.sender, REPLY_RESOURCE);
        endpoint.handleRequestMessage(this.connection, this.receiver, resource, protonDelivery, message);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(DeliveryState.class);
        ((ProtonDelivery) Mockito.verify(protonDelivery)).disposition((DeliveryState) forClass.capture(), ((Boolean) ArgumentMatchers.eq(Boolean.TRUE)).booleanValue());
        Assertions.assertThat((DeliveryState) forClass.getValue()).isInstanceOf(Accepted.class);
        ((ProtonReceiver) Mockito.verify(this.receiver, Mockito.never())).close();
        ((AuthorizationService) Mockito.verify(authorizationService)).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "unauthorized");
        ((EventBus) Mockito.verify(this.eventBus, Mockito.never())).request(ArgumentMatchers.anyString(), ArgumentMatchers.any(), (DeliveryOptions) ArgumentMatchers.any(DeliveryOptions.class), (Handler) ArgumentMatchers.any(Handler.class));
        ((ProtonSender) Mockito.verify(this.sender)).send((Message) ArgumentMatchers.argThat(message2 -> {
            return hasStatusCode(message2, 403);
        }));
    }

    @Test
    public void testHandleMessageSendsResponseForTimedOutRequests() {
        testHandleMessageSendsResponseWithStatusCode(new ReplyException(ReplyFailure.TIMEOUT), 503);
    }

    @Test
    public void testHandleMessageSendsResponseForFailedRequests() {
        testHandleMessageSendsResponseWithStatusCode(new RuntimeException(), 500);
    }

    private void testHandleMessageSendsResponseWithStatusCode(Throwable th, int i) {
        Message message = ProtonHelper.message();
        message.setSubject("get");
        message.setReplyTo(REPLY_RESOURCE.toString());
        message.setCorrelationId(UUID.randomUUID().toString());
        ProtonDelivery protonDelivery = (ProtonDelivery) Mockito.mock(ProtonDelivery.class);
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        Mockito.when(authorizationService.isAuthorized((HonoUser) ArgumentMatchers.any(HonoUser.class), (ResourceIdentifier) ArgumentMatchers.any(ResourceIdentifier.class), ArgumentMatchers.anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.setAuthorizationService(authorizationService);
        endpoint.onLinkAttach(this.connection, this.sender, REPLY_RESOURCE);
        endpoint.handleRequestMessage(this.connection, this.receiver, resource, protonDelivery, message);
        ((ProtonDelivery) Mockito.verify(protonDelivery)).disposition((DeliveryState) ArgumentMatchers.argThat(deliveryState -> {
            return deliveryState instanceof Accepted;
        }), ((Boolean) ArgumentMatchers.eq(Boolean.TRUE)).booleanValue());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(Handler.class);
        ((EventBus) Mockito.verify(this.eventBus)).request((String) ArgumentMatchers.eq(EVENT_BUS_ADDRESS), ArgumentMatchers.any(JsonObject.class), (DeliveryOptions) ArgumentMatchers.any(DeliveryOptions.class), (Handler) forClass.capture());
        ((Handler) forClass.getValue()).handle(Future.failedFuture(th));
        ((ProtonSender) Mockito.verify(this.sender)).send((Message) ArgumentMatchers.argThat(message2 -> {
            return hasStatusCode(message2, i);
        }));
        ((ProtonReceiver) Mockito.verify(this.receiver)).flow(1);
    }

    @Test
    public void testHandleMessageSendsResponseForMalformedPayload() {
        Message message = ProtonHelper.message();
        message.setSubject("get");
        message.setReplyTo(REPLY_RESOURCE.toString());
        message.setCorrelationId(UUID.randomUUID().toString());
        message.setBody(new Data(new Binary(new byte[]{1, 2, 3})));
        ProtonDelivery protonDelivery = (ProtonDelivery) Mockito.mock(ProtonDelivery.class);
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.onLinkAttach(this.connection, this.sender, REPLY_RESOURCE);
        endpoint.handleRequestMessage(this.connection, this.receiver, resource, protonDelivery, message);
        ((ProtonDelivery) Mockito.verify(protonDelivery)).disposition((DeliveryState) ArgumentMatchers.argThat(deliveryState -> {
            return deliveryState instanceof Accepted;
        }), ((Boolean) ArgumentMatchers.eq(Boolean.TRUE)).booleanValue());
        ((EventBus) Mockito.verify(this.eventBus, Mockito.never())).request((String) ArgumentMatchers.eq(EVENT_BUS_ADDRESS), ArgumentMatchers.any(JsonObject.class), (DeliveryOptions) ArgumentMatchers.any(DeliveryOptions.class), (Handler) ArgumentMatchers.any(Handler.class));
        ((ProtonSender) Mockito.verify(this.sender)).send((Message) ArgumentMatchers.argThat(message2 -> {
            return hasStatusCode(message2, 400);
        }));
        ((ProtonReceiver) Mockito.verify(this.receiver)).flow(1);
    }

    @Test
    public void testHandleMessageProcessesAuthorizedRequests() {
        Message message = ProtonHelper.message();
        message.setSubject("get");
        message.setReplyTo(REPLY_RESOURCE.toString());
        ProtonDelivery protonDelivery = (ProtonDelivery) Mockito.mock(ProtonDelivery.class);
        AuthorizationService authorizationService = (AuthorizationService) Mockito.mock(AuthorizationService.class);
        Mockito.when(authorizationService.isAuthorized((HonoUser) ArgumentMatchers.any(HonoUser.class), (ResourceIdentifier) ArgumentMatchers.any(ResourceIdentifier.class), ArgumentMatchers.anyString())).thenReturn(Future.succeededFuture(Boolean.TRUE));
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.setAuthorizationService(authorizationService);
        endpoint.onLinkAttach(this.connection, this.sender, REPLY_RESOURCE);
        endpoint.handleRequestMessage(this.connection, this.receiver, resource, protonDelivery, message);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(DeliveryState.class);
        ((ProtonDelivery) Mockito.verify(protonDelivery)).disposition((DeliveryState) forClass.capture(), ((Boolean) ArgumentMatchers.eq(Boolean.TRUE)).booleanValue());
        Assertions.assertThat((DeliveryState) forClass.getValue()).isInstanceOf(Accepted.class);
        ((ProtonReceiver) Mockito.verify(this.receiver, Mockito.never())).close();
        ((AuthorizationService) Mockito.verify(authorizationService)).isAuthorized(Constants.PRINCIPAL_ANONYMOUS, resource, "get");
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(Handler.class);
        ((EventBus) Mockito.verify(this.eventBus)).request((String) ArgumentMatchers.eq(EVENT_BUS_ADDRESS), ArgumentMatchers.any(JsonObject.class), (DeliveryOptions) ArgumentMatchers.any(DeliveryOptions.class), (Handler) forClass2.capture());
        EventBusMessage forStatusCode = EventBusMessage.forStatusCode(202);
        io.vertx.core.eventbus.Message message2 = (io.vertx.core.eventbus.Message) Mockito.mock(io.vertx.core.eventbus.Message.class);
        Mockito.when(message2.body()).thenReturn(forStatusCode.toJson());
        ((Handler) forClass2.getValue()).handle(Future.succeededFuture(message2));
        ((ProtonSender) Mockito.verify(this.sender)).send((Message) ArgumentMatchers.any(Message.class));
        ((ProtonReceiver) Mockito.verify(this.receiver)).flow(1);
    }

    @Test
    public void testDuplicateSubscription() {
        ProtonConnection protonConnection = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        ProtonConnection protonConnection2 = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        ProtonSender protonSender = (ProtonSender) Mockito.mock(ProtonSender.class);
        ProtonSender protonSender2 = (ProtonSender) Mockito.mock(ProtonSender.class);
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.onLinkAttach(protonConnection, protonSender, REPLY_RESOURCE);
        ((ProtonSender) Mockito.verify(protonSender)).open();
        endpoint.onLinkAttach(protonConnection2, protonSender2, REPLY_RESOURCE);
        ((ProtonSender) Mockito.verify(protonSender2)).close();
    }

    @Test
    public void testFreeSubscription() {
        ProtonConnection protonConnection = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        ProtonSession protonSession = (ProtonSession) Mockito.mock(ProtonSession.class);
        Mockito.when(protonSession.getConnection()).thenReturn(protonConnection);
        ProtonConnection protonConnection2 = (ProtonConnection) Mockito.mock(ProtonConnection.class);
        ProtonSender protonSender = (ProtonSender) Mockito.mock(ProtonSender.class);
        Mockito.when(protonSender.getSession()).thenReturn(protonSession);
        ProtonSender protonSender2 = (ProtonSender) Mockito.mock(ProtonSender.class);
        RequestResponseEndpoint<ServiceConfigProperties> endpoint = getEndpoint(true);
        endpoint.onLinkAttach(protonConnection, protonSender, REPLY_RESOURCE);
        ((ProtonSender) Mockito.verify(protonSender)).open();
        endpoint.onConnectionClosed(protonConnection);
        endpoint.onLinkAttach(protonConnection2, protonSender2, REPLY_RESOURCE);
        ((ProtonSender) Mockito.verify(protonSender2)).open();
    }

    private boolean hasStatusCode(Message message, int i) {
        return MessageHelper.getStatus(message).intValue() == i;
    }

    private RequestResponseEndpoint<ServiceConfigProperties> getEndpoint(final boolean z) {
        RequestResponseEndpoint<ServiceConfigProperties> requestResponseEndpoint = new RequestResponseEndpoint<ServiceConfigProperties>(this.vertx) { // from class: org.eclipse.hono.service.amqp.RequestResponseEndpointTest.1
            public String getName() {
                return "test";
            }

            protected String getEventBusServiceAddress() {
                return RequestResponseEndpointTest.EVENT_BUS_ADDRESS;
            }

            protected Future<EventBusMessage> createEventBusRequestMessage(Message message, ResourceIdentifier resourceIdentifier, HonoUser honoUser) {
                try {
                    return Future.succeededFuture(EventBusMessage.forOperation(message).setJsonPayload(message));
                } catch (DecodeException e) {
                    return Future.failedFuture(new ClientErrorException(400));
                }
            }

            protected Message getAmqpReply(EventBusMessage eventBusMessage) {
                Message message = ProtonHelper.message();
                MessageHelper.addProperty(message, "status", eventBusMessage.getStatus());
                message.setAddress(eventBusMessage.getReplyToAddress());
                return message;
            }

            protected boolean passesFormalVerification(ResourceIdentifier resourceIdentifier, Message message) {
                return z;
            }
        };
        requestResponseEndpoint.setConfiguration(new ServiceConfigProperties());
        return requestResponseEndpoint;
    }
}
