package edu.stanford.protege.webprotege.ipc.pulsar;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.stanford.protege.webprotege.authorization.AuthorizationStatus;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusRequest;
import edu.stanford.protege.webprotege.authorization.GetAuthorizationStatusResponse;
import edu.stanford.protege.webprotege.authorization.Resource;
import edu.stanford.protege.webprotege.authorization.Subject;
import edu.stanford.protege.webprotege.common.Request;
import edu.stanford.protege.webprotege.common.Response;
import edu.stanford.protege.webprotege.common.UserId;
import edu.stanford.protege.webprotege.ipc.AuthorizedCommandHandler;
import edu.stanford.protege.webprotege.ipc.CommandExecutionException;
import edu.stanford.protege.webprotege.ipc.CommandExecutor;
import edu.stanford.protege.webprotege.ipc.CommandHandler;
import edu.stanford.protege.webprotege.ipc.ExecutionContext;
import edu.stanford.protege.webprotege.ipc.Headers;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.SerializedLambda;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpStatus;

/* loaded from: input_file:BOOT-INF/lib/webprotege-ipc-0.10.2.jar:edu/stanford/protege/webprotege/ipc/pulsar/PulsarCommandHandlerWrapper.class */
public class PulsarCommandHandlerWrapper<Q extends Request<R>, R extends Response> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) PulsarCommandHandlerWrapper.class);
    private final String applicationName;
    private final String tenant;
    private final PulsarClient pulsarClient;
    private final CommandHandler<Q, R> handler;
    private final ObjectMapper objectMapper;
    private final PulsarProducersManager producersManager;
    private final CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> authorizationStatusExecutor;
    private Consumer<byte[]> consumer;

    public PulsarCommandHandlerWrapper(String str, @Value("webprotege.pulsar.tenant") String str2, PulsarClient pulsarClient, CommandHandler<Q, R> commandHandler, ObjectMapper objectMapper, PulsarProducersManager pulsarProducersManager, CommandExecutor<GetAuthorizationStatusRequest, GetAuthorizationStatusResponse> commandExecutor) {
        this.applicationName = str;
        this.tenant = str2;
        this.pulsarClient = pulsarClient;
        this.handler = commandHandler;
        this.objectMapper = objectMapper;
        this.producersManager = pulsarProducersManager;
        this.authorizationStatusExecutor = commandExecutor;
    }

    public void unsubscribe() {
        try {
            this.consumer.unsubscribe();
        } catch (PulsarClientException e) {
            logger.warn("An exception was thrown when unsubscribing", (Throwable) e);
        }
    }

    public void subscribe() {
        try {
            this.consumer = this.pulsarClient.newConsumer().topic(getRequestsTopicUrl(this.handler)).subscriptionName(getSubscriptionName(this.handler)).messageListener(this::handleCommandMessage).subscribe();
        } catch (PulsarClientException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void handleCommandMessage(Consumer<byte[]> consumer, Message<byte[]> message) {
        String property = message.getProperty(Headers.REPLY_CHANNEL);
        if (property == null) {
            logger.error("webprotege_replyChannel header is missing.  Cannot reply to message.");
            consumer.acknowledgeAsync((Message<?>) message);
            return;
        }
        String property2 = message.getProperty(Headers.CORRELATION_ID);
        if (property2 == null) {
            logger.error("webprotege_correlationId header is missing.  Cannot process message.");
            consumer.acknowledgeAsync((Message<?>) message);
            return;
        }
        String property3 = message.getProperty(Headers.USER_ID);
        if (property3 != null) {
            parseAndHandleRequest(consumer, message, property, property2, property3);
            return;
        }
        logger.error("webprotege_userId header is missing.  Cannot process message.  Returning Forbidden Error Code.  Message reply topic: {}", property);
        replyWithErrorResponse(property, property2, "", HttpStatus.FORBIDDEN);
        consumer.acknowledgeAsync((Message<?>) message);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void parseAndHandleRequest(Consumer<byte[]> consumer, Message<byte[]> message, String str, String str2, String str3) {
        try {
            Request request = (Request) this.objectMapper.readValue(message.getData(), this.handler.getRequestClass());
            consumer.acknowledgeAsync((Message<?>) message);
            CommandHandler<Q, R> commandHandler = this.handler;
            if (commandHandler instanceof AuthorizedCommandHandler) {
                authorizeAndReplyToRequest(str, str2, str3, request, (AuthorizedCommandHandler) commandHandler);
            } else {
                handleAndReplyToRequest(str, str2, str3, request);
            }
        } catch (IOException e) {
            logger.error("Could not parse request", (Throwable) e);
            consumer.negativeAcknowledge((Message<?>) message);
            replyWithErrorResponse(str, str2, str3, HttpStatus.BAD_REQUEST);
        }
    }

    private void authorizeAndReplyToRequest(String str, String str2, String str3, Q q, AuthorizedCommandHandler<Q, R> authorizedCommandHandler) {
        Resource targetResource = authorizedCommandHandler.getTargetResource(q);
        this.authorizationStatusExecutor.execute(new GetAuthorizationStatusRequest(targetResource, Subject.forUser(str3), authorizedCommandHandler.getRequiredCapabilities().stream().findFirst().orElse(null)), new ExecutionContext(new UserId(str3), "")).whenComplete((getAuthorizationStatusResponse, th) -> {
            if (th != null) {
                logger.warn("An error occurred when requesting the authorization status for {} on {}. Error: {}", str3, targetResource, th.getMessage());
                replyWithErrorResponse(str, str2, str3, HttpStatus.INTERNAL_SERVER_ERROR);
            } else if (getAuthorizationStatusResponse.authorizationStatus() == AuthorizationStatus.AUTHORIZED) {
                handleAndReplyToRequest(str, str2, str3, q);
            } else {
                logger.info("Permission denied when attempting to execute a request.  User: {}, Request: {}", str3, q);
                replyWithErrorResponse(str, str2, str3, HttpStatus.FORBIDDEN);
            }
        });
    }

    private void handleAndReplyToRequest(String str, String str2, String str3, Q q) {
        try {
            this.handler.handleRequest(q, new ExecutionContext(new UserId(str3), "")).subscribe(response -> {
                replyWithSuccessResponse(str, str2, str3, response);
                logger.info("Sent reply to {}", str);
            }, th -> {
                if (!(th instanceof CommandExecutionException)) {
                    replyWithInternalServerError(str, str2, str3, q, th);
                    return;
                }
                CommandExecutionException commandExecutionException = (CommandExecutionException) th;
                logger.info("The command handler threw a CommandExecutionException exception while handling a request.  Sending an error as the reply to {}.  Code: {}, Message: {},  Request: {}", str, Integer.valueOf(commandExecutionException.getStatusCode()), th.getMessage(), q);
                replyWithErrorResponse(str, str2, str3, commandExecutionException.getStatus());
            });
        } catch (Throwable th2) {
            logger.error("Uncaught exception when handling request", th2);
            replyWithInternalServerError(str, str2, str3, q, th2);
        }
    }

    private void replyWithInternalServerError(String str, String str2, String str3, Q q, Throwable th) {
        logger.info("The command handler threw an exception while handling a request.  Sending an error as the reply to {}.  Exception class: {}, Message: {},  Request: {}", str, th.getClass().getName(), th.getMessage(), q);
        replyWithErrorResponse(str, str2, str3, HttpStatus.INTERNAL_SERVER_ERROR);
    }

    private void replyWithErrorResponse(String str, String str2, String str3, HttpStatus httpStatus) {
        Producer<byte[]> producer = this.producersManager.getProducer(getReplyTopicUrl(str));
        producer.newMessage().property(Headers.CORRELATION_ID, str2).property(Headers.USER_ID, str3).property(Headers.ERROR, serializeCommandExecutionException(new CommandExecutionException(httpStatus))).sendAsync();
    }

    private void replyWithSuccessResponse(String str, String str2, String str3, R r) {
        try {
            Producer<byte[]> producer = this.producersManager.getProducer(getReplyTopicUrl(str));
            producer.newMessage().property(Headers.CORRELATION_ID, str2).property(Headers.USER_ID, str3).value(this.objectMapper.writeValueAsBytes(r)).sendAsync();
        } catch (JsonProcessingException e) {
            replyWithErrorResponse(str, str2, str3, HttpStatus.INTERNAL_SERVER_ERROR);
        }
    }

    private String getReplyTopicUrl(String str) {
        return this.tenant + "/command-responses/" + str;
    }

    private String getSubscriptionName(CommandHandler<?, ?> commandHandler) {
        return this.applicationName + "--" + commandHandler.getChannelName() + "--handler";
    }

    private String getConsumerName(CommandHandler<?, ?> commandHandler) {
        return this.applicationName + "--" + commandHandler.getChannelName() + "--handler";
    }

    private String getRequestsTopicUrl(CommandHandler<?, ?> commandHandler) {
        return this.tenant + "/command-requests/" + commandHandler.getChannelName();
    }

    private String serializeCommandExecutionException(CommandExecutionException commandExecutionException) {
        try {
            return this.objectMapper.writeValueAsString(commandExecutionException);
        } catch (JsonProcessingException e) {
            logger.error("Error while serializing CommandExecutionException", (Throwable) e);
            return "{\n    \"statusCode\" : 500\n}\n".strip();
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1185305220:
                if (implMethodName.equals("handleCommandMessage")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("edu/stanford/protege/webprotege/ipc/pulsar/PulsarCommandHandlerWrapper") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarCommandHandlerWrapper pulsarCommandHandlerWrapper = (PulsarCommandHandlerWrapper) serializedLambda.getCapturedArg(0);
                    return pulsarCommandHandlerWrapper::handleCommandMessage;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
