package org.eclipse.hono.service.amqp;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.json.JsonObject;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
import io.vertx.proton.ProtonQoS;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.ProtonSender;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.auth.HonoUser;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ServiceConfigProperties;
import org.eclipse.hono.service.auth.AuthorizationService;
import org.eclipse.hono.service.auth.ClaimsBasedAuthorizationService;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.HonoProtonHelper;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.RequestResponseApiConstants;
import org.eclipse.hono.util.ResourceIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/eclipse/hono/service/amqp/AbstractRequestResponseEndpoint.class */
public abstract class AbstractRequestResponseEndpoint<T extends ServiceConfigProperties> extends AbstractAmqpEndpoint<T> {
    protected final Logger log;
    private final Map<String, ProtonSender> replyToSenderMap;
    private AuthorizationService authorizationService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.hono.service.amqp.AbstractRequestResponseEndpoint$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/hono/service/amqp/AbstractRequestResponseEndpoint$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$vertx$core$eventbus$ReplyFailure = new int[ReplyFailure.values().length];

        static {
            try {
                $SwitchMap$io$vertx$core$eventbus$ReplyFailure[ReplyFailure.TIMEOUT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRequestResponseEndpoint(Vertx vertx) {
        super((Vertx) Objects.requireNonNull(vertx));
        this.log = LoggerFactory.getLogger(getClass());
        this.replyToSenderMap = new HashMap();
        this.authorizationService = new ClaimsBasedAuthorizationService();
    }

    protected abstract boolean passesFormalVerification(ResourceIdentifier resourceIdentifier, Message message);

    protected abstract Future<Message> handleRequestMessage(Message message, ResourceIdentifier resourceIdentifier, SpanContext spanContext);

    public final AuthorizationService getAuthorizationService() {
        return this.authorizationService;
    }

    @Autowired(required = false)
    public final void setAuthorizationService(AuthorizationService authorizationService) {
        this.authorizationService = authorizationService;
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier) {
        if (ProtonQoS.AT_MOST_ONCE.equals(protonReceiver.getRemoteQoS())) {
            this.logger.debug("client wants to use unsupported AT MOST ONCE delivery mode for endpoint [{}], closing link ...", getName());
            protonReceiver.setCondition(ProtonHelper.condition(AmqpError.PRECONDITION_FAILED.toString(), "endpoint requires AT_LEAST_ONCE QoS"));
            protonReceiver.close();
            return;
        }
        this.logger.debug("establishing link for receiving request messages from client [{}]", protonReceiver.getName());
        protonReceiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
        protonReceiver.setAutoAccept(true);
        protonReceiver.setTarget(protonReceiver.getRemoteTarget());
        protonReceiver.setSource(protonReceiver.getRemoteSource());
        protonReceiver.setPrefetch(0);
        protonReceiver.handler((protonDelivery, message) -> {
            try {
                handleRequestMessage(protonConnection, protonReceiver, resourceIdentifier, protonDelivery, message);
            } catch (Exception e) {
                this.logger.warn("error handling message", e);
                ProtonHelper.released(protonDelivery, true);
            }
        });
        HonoProtonHelper.setCloseHandler(protonReceiver, asyncResult -> {
            onLinkDetach(protonReceiver);
        });
        HonoProtonHelper.setDetachHandler(protonReceiver, asyncResult2 -> {
            onLinkDetach(protonReceiver);
        });
        protonReceiver.open();
        this.logger.debug("flowing {} credits to client", Integer.valueOf(((ServiceConfigProperties) this.config).getReceiverLinkCredit()));
        protonReceiver.flow(((ServiceConfigProperties) this.config).getReceiverLinkCredit());
    }

    protected final void handleRequestMessage(ProtonConnection protonConnection, ProtonReceiver protonReceiver, ResourceIdentifier resourceIdentifier, ProtonDelivery protonDelivery, Message message) {
        HonoUser clientPrincipal = Constants.getClientPrincipal(protonConnection);
        String replyTo = message.getReplyTo();
        Span start = TracingHelper.buildServerChildSpan(this.tracer, TracingHelper.extractSpanContext(this.tracer, message), "process request message", getName()).withTag(Tags.HTTP_METHOD.getKey(), message.getSubject()).withTag(Tags.MESSAGE_BUS_DESTINATION.getKey(), resourceIdentifier.toString()).start();
        if (passesFormalVerification(resourceIdentifier, message)) {
            ProtonHelper.accepted(protonDelivery, true);
            start.log("request message accepted");
            getSenderForConnection(protonConnection, replyTo).compose(protonSender -> {
                return isAuthorized(clientPrincipal, resourceIdentifier, message).map(bool -> {
                    Logger logger = this.logger;
                    Object[] objArr = new Object[4];
                    objArr[0] = clientPrincipal.getName();
                    objArr[1] = bool.booleanValue() ? "" : "not ";
                    objArr[2] = resourceIdentifier;
                    objArr[3] = message.getSubject();
                    logger.debug("client [{}] is {}authorized to {}:{}", objArr);
                    if (bool.booleanValue()) {
                        return bool;
                    }
                    throw new ClientErrorException(403, "not authorized to invoke operation");
                }).compose(bool2 -> {
                    return handleRequestMessage(message, resourceIdentifier, start.context());
                }).compose(message2 -> {
                    return filterResponse(clientPrincipal, message, message2);
                }).otherwise(th -> {
                    this.logger.debug("error processing request [resource: {}, op: {}]: {}", new Object[]{resourceIdentifier, message.getSubject(), th.getMessage()});
                    start.log("error processing request");
                    TracingHelper.logError(start, th);
                    ServiceInvocationException serviceInvocationException = getServiceInvocationException(th);
                    Tags.HTTP_STATUS.set(start, Integer.valueOf(serviceInvocationException.getErrorCode()));
                    return RequestResponseApiConstants.getErrorMessage(serviceInvocationException.getErrorCode(), serviceInvocationException.getMessage(), message);
                }).map(message3 -> {
                    Tags.HTTP_STATUS.set(start, MessageHelper.getStatus(message3));
                    if (!protonSender.isOpen()) {
                        TracingHelper.logError(start, "cannot send response, reply-to link is closed");
                        return null;
                    }
                    ProtonDelivery send = protonSender.send(message3);
                    this.logger.debug("sent response message to client  [correlation-id: {}, content-type: {}]", message3.getCorrelationId(), message3.getContentType());
                    start.log("sent response message to client");
                    return send;
                });
            }).onComplete(asyncResult -> {
                flowCreditToRequestor(protonReceiver, replyTo);
                start.finish();
            });
        } else {
            MessageHelper.rejected(protonDelivery, new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed request message"));
            flowCreditToRequestor(protonReceiver, replyTo);
            TracingHelper.logError(start, "malformed request message");
            start.finish();
        }
    }

    protected Future<Message> filterResponse(HonoUser honoUser, Message message, Message message2) {
        return Future.succeededFuture((Message) Objects.requireNonNull(message2));
    }

    protected Future<Boolean> isAuthorized(HonoUser honoUser, ResourceIdentifier resourceIdentifier, Message message) {
        Objects.requireNonNull(message);
        return getAuthorizationService().isAuthorized(honoUser, resourceIdentifier, message.getSubject());
    }

    @Override // org.eclipse.hono.service.amqp.AbstractAmqpEndpoint, org.eclipse.hono.service.amqp.AmqpEndpoint
    public final void onLinkAttach(ProtonConnection protonConnection, ProtonSender protonSender, ResourceIdentifier resourceIdentifier) {
        if (!isValidReplyToAddress(resourceIdentifier)) {
            this.logger.debug("client [{}] provided invalid reply-to address", protonSender.getName());
            protonSender.setCondition(ProtonHelper.condition(AmqpError.INVALID_FIELD, String.format("reply-to address must have the following format %s/<tenant>/<reply-address>", getName())));
            protonSender.close();
            return;
        }
        String resourceIdentifier2 = resourceIdentifier.toString();
        if (this.replyToSenderMap.containsKey(resourceIdentifier2)) {
            this.logger.debug("client [{}] wanted to subscribe to already subscribed reply-to address [{}]", protonSender.getName(), resourceIdentifier2);
            protonSender.setCondition(ProtonHelper.condition(AmqpError.ILLEGAL_STATE, String.format("reply-to address [%s] is already subscribed", resourceIdentifier2)));
            protonSender.close();
            return;
        }
        this.logger.debug("establishing response sender link with client [{}]", protonSender.getName());
        protonSender.setQoS(ProtonQoS.AT_LEAST_ONCE);
        protonSender.setSource(protonSender.getRemoteSource());
        protonSender.setTarget(protonSender.getRemoteTarget());
        registerSenderForReplyTo(resourceIdentifier2, protonSender);
        HonoProtonHelper.setCloseHandler(protonSender, asyncResult -> {
            this.logger.debug("client [{}] closed sender link", protonSender.getName());
            unregisterSenderForReplyTo(resourceIdentifier2);
            protonSender.close();
        });
        HonoProtonHelper.setDetachHandler(protonSender, asyncResult2 -> {
            this.logger.debug("client [{}] detached sender link", protonSender.getName());
            unregisterSenderForReplyTo(resourceIdentifier2);
            protonSender.close();
        });
        protonSender.open();
    }

    @Override // org.eclipse.hono.service.amqp.AmqpEndpoint
    public void onConnectionClosed(ProtonConnection protonConnection) {
        Objects.requireNonNull(protonConnection);
        deallocateAllSendersForConnection(protonConnection);
    }

    private Future<ProtonSender> getSenderForConnection(ProtonConnection protonConnection, String str) {
        Promise promise = Promise.promise();
        ProtonSender protonSender = this.replyToSenderMap.get(str);
        if (protonSender != null && protonSender.isOpen() && protonSender.getSession().getConnection() == protonConnection) {
            promise.complete(protonSender);
        } else {
            promise.fail(new ClientErrorException(412, "must open receiver link for reply-to address first"));
        }
        return promise.future();
    }

    private void registerSenderForReplyTo(String str, ProtonSender protonSender) {
        ProtonSender put = this.replyToSenderMap.put(str, protonSender);
        if (put == null || put == protonSender) {
            this.logger.debug("registered sender [{}] for replies to [{}]", protonSender, str);
        } else {
            this.logger.info("replaced existing sender [{}] for replies to [{}] with sender [{}]", new Object[]{put, str, protonSender});
        }
    }

    private void unregisterSenderForReplyTo(String str) {
        ProtonSender remove = this.replyToSenderMap.remove(str);
        if (remove == null) {
            this.logger.warn("sender was not allocated for replyTo address [{}]", str);
        } else {
            this.logger.debug("deallocated sender [{}] for replies to [{}]", remove.getName(), str);
        }
    }

    private void deallocateAllSendersForConnection(ProtonConnection protonConnection) {
        this.replyToSenderMap.entrySet().removeIf(entry -> {
            return ((ProtonSender) entry.getValue()).getSession().getConnection() == protonConnection;
        });
    }

    private void flowCreditToRequestor(ProtonReceiver protonReceiver, String str) {
        protonReceiver.flow(1);
        this.logger.trace("replenished client [reply-to: {}, current credit: {}]", str, Integer.valueOf(protonReceiver.getCredit()));
    }

    protected boolean isValidReplyToAddress(ResourceIdentifier resourceIdentifier) {
        return resourceIdentifier != null && resourceIdentifier.getResourcePath().length >= 3;
    }

    protected static final <T> T getTypesafeValueForField(Class<T> cls, JsonObject jsonObject, String str) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(jsonObject);
        Objects.requireNonNull(str);
        Object value = jsonObject.getValue(str);
        if (cls.isInstance(value)) {
            return cls.cast(value);
        }
        return null;
    }

    protected static final <T> T removeTypesafeValueForField(Class<T> cls, JsonObject jsonObject, String str) {
        Objects.requireNonNull(cls);
        Objects.requireNonNull(jsonObject);
        Objects.requireNonNull(str);
        Object remove = jsonObject.remove(str);
        if (cls.isInstance(remove)) {
            return cls.cast(remove);
        }
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Future<Message> finishSpanOnFutureCompletion(Span span, Future<Message> future) {
        return future.compose(message -> {
            Integer status = MessageHelper.getStatus(message);
            Tags.HTTP_STATUS.set(span, MessageHelper.getStatus(message));
            if (status != null && (status.intValue() < 100 || status.intValue() >= 600)) {
                Tags.ERROR.set(span, true);
            }
            span.finish();
            return Future.succeededFuture(message);
        }).recover(th -> {
            Tags.HTTP_STATUS.set(span, Integer.valueOf(ServiceInvocationException.extractStatusCode(th)));
            TracingHelper.logError(span, th);
            span.finish();
            return Future.failedFuture(th);
        });
    }

    private ServiceInvocationException getServiceInvocationException(Throwable th) {
        if (th instanceof ServiceInvocationException) {
            return (ServiceInvocationException) th;
        }
        if (!(th instanceof ReplyException)) {
            return new ServerErrorException(500);
        }
        switch (AnonymousClass1.$SwitchMap$io$vertx$core$eventbus$ReplyFailure[((ReplyException) th).failureType().ordinal()]) {
            case 1:
                return new ServerErrorException(503, "request could not be processed at the moment");
            default:
                return new ServerErrorException(500);
        }
    }
}
