package org.eclipse.hono.service.amqp;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.opentracing.noop.NoopSpanContext;
import io.opentracing.noop.NoopTracerFactory;
import io.opentracing.propagation.Format;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.http.CaseInsensitiveHeaders;
import io.vertx.core.json.DecodeException;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.tracing.MultiMapInjectAdapter;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AmqpErrorException;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.EventBusMessage;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/service/amqp/RequestResponseEndpoint.class */
public abstract class RequestResponseEndpoint<T extends ServiceConfigProperties> extends AbstractAmqpEndpoint<T> {
    protected Tracer tracer;
    private final Map<String, ProtonReceiver> replyToReceiverMap;
    private final Multimap<ProtonConnection, MessageConsumer<?>> replyConsumerMap;
    private final Multimap<ProtonConnection, String> replyAddressMap;
    private final Set<String> replyAddresses;
    private AuthorizationService authorizationService;

    /* JADX INFO: Access modifiers changed from: protected */
    public RequestResponseEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
        this.tracer = NoopTracerFactory.create();
        this.replyToReceiverMap = new HashMap();
        this.replyConsumerMap = HashMultimap.create();
        this.replyAddressMap = HashMultimap.create();
        this.replyAddresses = new HashSet();
        this.authorizationService = new ClaimsBasedAuthorizationService();
    }

    public abstract void processRequest(Message message, ResourceIdentifier resourceIdentifier, HonoUser honoUser);

    protected abstract Message getAmqpReply(EventBusMessage eventBusMessage);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required = false)
    public final void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    @Autowired(required = false)
    public final void setTracer(Tracer tracer) {
        this.logger.info("using OpenTracing Tracer implementation [{}]", tracer.getClass().getName());
        this.tracer = (Tracer) Objects.requireNonNull(tracer);
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", getName());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "endpoint requires AT_LEAST_ONCE QoS"));
            protonReceiver.close();
            return;
        }
        this.logger.debug("establishing link for receiving messages from client [{}]", protonReceiver.getName());
        protonReceiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
        protonReceiver.setAutoAccept(true);
        protonReceiver.setPrefetch(0);
        protonReceiver.handler((protonDelivery, message) -> {
            handleMessage(protonConnection, protonReceiver, resourceIdentifier, protonDelivery, message);
        });
        HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
            onLinkDetach(protonReceiver);
        });
        protonReceiver.open();
        protonReceiver.flow(((ServiceConfigProperties) this.config).getReceiverLinkCredit());
        this.logger.debug("Flowing {} credits to the sender", Integer.valueOf(((ServiceConfigProperties) this.config).getReceiverLinkCredit()));
    }

    protected final void handleMessage(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier, ProtonDelivery protonDelivery, Message message) {
        Future future = Future.future();
        if (passesFormalVerification(resourceIdentifier, message)) {
            future.complete();
        } else {
            future.fail(new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
        }
        HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
        String replyTo = message.getReplyTo();
        future.compose(r8 -> {
            if (!this.replyAddresses.contains(replyTo)) {
                return Future.failedFuture(new AmqpErrorException(AmqpError.ILLEGAL_STATE, "unsubscribed reply-to address"));
            }
            allocateReceiverForReplyTo(replyTo, protonReceiver);
            return Future.succeededFuture();
        }).compose(obj -> {
            return isAuthorized(clientPrincipal, resourceIdentifier, message);
        }).compose(bool -> {
            Logger logger = this.logger;
            Object[] objArr = new Object[4];
            objArr[0] = clientPrincipal.getName();
            objArr[1] = bool.booleanValue() ? "" : "not ";
            objArr[2] = resourceIdentifier;
            objArr[3] = message.getSubject();
            logger.debug("client [{}] is {}authorized to {}:{}", objArr);
            if (!bool.booleanValue()) {
                return Future.failedFuture(new AmqpErrorException(AmqpError.UNAUTHORIZED_ACCESS, "unauthorized"));
            }
            try {
                processRequest(message, resourceIdentifier, clientPrincipal);
                ProtonHelper.accepted(protonDelivery, true);
                return Future.succeededFuture();
            } catch (DecodeException e) {
                return Future.failedFuture(new AmqpErrorException(AmqpError.DECODE_ERROR, "malformed payload"));
            }
        }).otherwise(th -> {
            flowCreditToRequestor(replyTo);
            if (th instanceof AmqpErrorException) {
                MessageHelper.rejected(protonDelivery, ((AmqpErrorException) th).asErrorCondition());
                return null;
            }
            this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{resourceIdentifier, message.getSubject(), th.getMessage()});
            MessageHelper.rejected(protonDelivery, ProtonHelper.condition(AmqpError.INTERNAL_ERROR, "internal error"));
            return null;
        });
    }

    protected Future<Boolean> isAuthorized(HonoUser honoUser, ResourceIdentifier resourceIdentifier, Message message) {
        Objects.requireNonNull(message);
        return getAuthorizationService().isAuthorized(honoUser, resourceIdentifier, message.getSubject());
    }

    protected Future<EventBusMessage> filterResponse(HonoUser honoUser, EventBusMessage eventBusMessage) {
        return Future.succeededFuture((EventBusMessage) Objects.requireNonNull(eventBusMessage));
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonSender protonSender, ResourceIdentifier resourceIdentifier) {
        if (!isValidReplyToAddress(resourceIdentifier)) {
            this.logger.debug("client [{}] provided invalid reply-to address", protonSender.getName());
            protonSender.setCondition(ProtonHelper.condition(AmqpError.INVALID_FIELD, String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", getName())));
            protonSender.close();
            return;
        }
        String resourceIdentifier2 = resourceIdentifier.toString();
        if (this.replyAddresses.contains(resourceIdentifier2)) {
            this.logger.debug("client [{}] wanted to subscribe to already subscribed reply-to address [{}]", protonSender.getName(), resourceIdentifier2);
            protonSender.setCondition(ProtonHelper.condition(AmqpError.ILLEGAL_STATE, String.format("reply-to address [%s] is already subscribed", resourceIdentifier2)));
            protonSender.close();
        } else {
            this.logger.debug("establishing response sender link with client [{}]", protonSender.getName());
            MessageConsumer<?> consumer = this.vertx.eventBus().consumer(resourceIdentifier2, message -> {
                if (this.logger.isTraceEnabled()) {
                    this.logger.trace("forwarding reply to client [{}]: {}", protonSender.getName(), ((JsonObject) message.body()).encodePrettily());
                }
                EventBusMessage fromJson = EventBusMessage.fromJson((JsonObject) message.body());
                filterResponse(Constants.getClientPrincipal(protonConnection), fromJson).recover(th -> {
                    return Future.succeededFuture(fromJson.getResponse(ServiceInvocationException.extractStatusCode(th)));
                }).map(eventBusMessage -> {
                    try {
                        protonSender.send(getAmqpReply(eventBusMessage));
                        flowCreditToRequestor(resourceIdentifier2);
                        return null;
                    } catch (Throwable th2) {
                        flowCreditToRequestor(resourceIdentifier2);
                        throw th2;
                    }
                });
            });
            registerConsumerForConnection(protonConnection, resourceIdentifier2, consumer);
            protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE);
            HonoProtonHelper.setCloseHandler(protonSender, asyncResult -> {
                this.logger.debug("client [{}] closed sender link, removing associated event bus consumer [{}]", protonSender.getName(), consumer.address());
                deallocateReceiverForReplyTo(resourceIdentifier2);
                unregisterConsumerForConnection(protonConnection, resourceIdentifier2, consumer);
                protonSender.close();
            });
            protonSender.open();
        }
    }

    @Override // org.eclipse.hono.service.amqp.AmqpEndpoint
    public void onConnectionClosed(ProtonConnection protonConnection) {
        Objects.requireNonNull(protonConnection);
        unregisterAllConsumersForConnection(protonConnection);
        deallocateAllReceiversForConnection(protonConnection);
    }

    private void allocateReceiverForReplyTo(String str, ProtonReceiver protonReceiver) {
        ProtonReceiver put = this.replyToReceiverMap.put(str, protonReceiver);
        if (put == null || put == protonReceiver) {
            this.logger.debug("Allocated receiver [{}] for replies to [{}]", protonReceiver, str);
        } else {
            this.logger.info("Allocated receiver [{}] for replies to [{}] - Had existing receiver: [{}]", new Object[]{protonReceiver, str, put});
        }
    }

    private void flowCreditToRequestor(String str) {
        ProtonReceiver protonReceiver = this.replyToReceiverMap.get(str);
        if (protonReceiver == null) {
            this.logger.warn("No receiver found for reply-to address [{}]", str);
            return;
        }
        protonReceiver.flow(1);
        if (this.logger.isTraceEnabled()) {
            this.logger.trace("Flowing credit back to sender - replyTo: [{}], currentCredits: {}", str, Integer.valueOf(protonReceiver.getCredit()));
        }
    }

    private void deallocateReceiverForReplyTo(String str) {
        ProtonReceiver remove = this.replyToReceiverMap.remove(str);
        if (remove == null) {
            this.logger.warn("Receiver was not allocated to replyTo address [{}]", str);
        } else {
            this.logger.debug("Deallocated receiver [{}] for replies to [{}]", remove, str);
        }
    }

    private void deallocateAllReceiversForConnection(ProtonConnection protonConnection) {
        this.replyToReceiverMap.entrySet().removeIf(entry -> {
            return ((ProtonReceiver) entry.getValue()).getSession().getConnection() == protonConnection;
        });
    }

    private void registerConsumerForConnection(ProtonConnection protonConnection, String str, MessageConsumer<?> messageConsumer) {
        this.replyConsumerMap.put(protonConnection, messageConsumer);
        this.replyAddressMap.put(protonConnection, str);
        this.replyAddresses.add(str);
    }

    private void unregisterConsumerForConnection(ProtonConnection protonConnection, String str, MessageConsumer<?> messageConsumer) {
        messageConsumer.unregister();
        this.replyConsumerMap.remove(protonConnection, messageConsumer);
        this.replyAddressMap.remove(protonConnection, str);
        this.replyAddresses.remove(str);
    }

    private void unregisterAllConsumersForConnection(ProtonConnection protonConnection) {
        this.replyConsumerMap.removeAll(protonConnection).forEach((v0) -> {
            v0.unregister();
        });
        this.replyAddresses.removeAll(this.replyAddressMap.removeAll(protonConnection));
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier resourceIdentifier) {
        return resourceIdentifier != null && resourceIdentifier.getResourcePath().length >= 3;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final DeliveryOptions createEventBusMessageDeliveryOptions(SpanContext spanContext) {
        DeliveryOptions deliveryOptions = new DeliveryOptions();
        if (spanContext != null && !(spanContext instanceof NoopSpanContext)) {
            CaseInsensitiveHeaders caseInsensitiveHeaders = new CaseInsensitiveHeaders();
            this.tracer.inject(spanContext, Format.Builtin.TEXT_MAP, new MultiMapInjectAdapter(caseInsensitiveHeaders));
            deliveryOptions.setHeaders(caseInsensitiveHeaders);
        }
        return deliveryOptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final SpanContext extractSpanContext(Message message) {
        return TracingHelper.extractSpanContext(this.tracer, message);
    }
}
