package org.eclipse.ditto.services.gateway.endpoints;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.actor.Status;
import akka.event.DiagnosticLoggingAdapter;
import akka.http.javadsl.model.ContentType;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpEntities;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.RequestEntity;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.Uri;
import akka.http.javadsl.model.headers.Location;
import akka.http.scaladsl.model.ContentType$;
import akka.japi.Creator;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskTimeoutException;
import akka.util.ByteString;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.ditto.json.JsonRuntimeException;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageTimeoutException;
import org.eclipse.ditto.services.gateway.starter.service.util.FireAndForgetMessageUtil;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.commands.base.WithEntity;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.messages.SendMessageAcceptedResponse;
import org.jboss.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/HttpRequestActor.class */
public final class HttpRequestActor extends AbstractActor {
    public static final String COMPLETE_MESSAGE = "complete";
    private static final ContentType CONTENT_TYPE_JSON = ContentTypes.APPLICATION_JSON;
    private static final ContentType CONTENT_TYPE_TEXT = ContentTypes.TEXT_PLAIN_UTF8;
    private static final String AKKA_HTTP_SERVER_REQUEST_TIMEOUT = "akka.http.server.request-timeout";
    private final DiagnosticLoggingAdapter logger;
    private final ActorRef proxyActor;
    private final CompletableFuture<HttpResponse> httpResponseFuture;
    private final Cancellable serverRequestTimeoutCancellable;
    private final Duration serverRequestTimeout;
    private final AbstractActor.Receive commandResponseAwaiting;
    private Duration messageTimeout;

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/HttpRequestActor$ServerRequestTimeoutMessage.class */
    private static final class ServerRequestTimeoutMessage {
        private static final ServerRequestTimeoutMessage INSTANCE = new ServerRequestTimeoutMessage();

        private ServerRequestTimeoutMessage() {
        }
    }

    private HttpRequestActor(ActorRef actorRef, HttpRequest httpRequest, CompletableFuture<HttpResponse> completableFuture) {
        this.logger = LogUtil.obtain(this);
        this.proxyActor = actorRef;
        this.httpResponseFuture = completableFuture;
        this.serverRequestTimeout = getContext().system().settings().config().getDuration("akka.http.server.request-timeout");
        this.serverRequestTimeoutCancellable = getContext().system().scheduler().scheduleOnce(FiniteDuration.apply(this.serverRequestTimeout.toNanos(), TimeUnit.NANOSECONDS), getSelf(), ServerRequestTimeoutMessage.INSTANCE, getContext().dispatcher(), (ActorRef) null);
        this.commandResponseAwaiting = ReceiveBuilder.create().matchEquals(COMPLETE_MESSAGE, str -> {
            this.logger.debug("Got stream's '{}' message", COMPLETE_MESSAGE);
        }).match(HttpResponse.class, this::completeWithResult).match(SendMessageAcceptedResponse.class, sendMessageAcceptedResponse -> {
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.ACCEPTED.toInt()));
        }).match(MessageCommandResponse.class, messageCommandResponse -> {
            completeWithResult(handleMessageResponseMessage(messageCommandResponse));
        }).match(CommandResponse.class, commandResponse -> {
            return commandResponse instanceof WithEntity;
        }, commandResponse2 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, commandResponse2);
            this.logger.debug("Got 'CommandResponse' 'WithEntity' message");
            completeWithResult(addEntityAccordingToContentType(HttpResponse.create().withStatus(commandResponse2.getStatusCode().toInt()), ((WithEntity) commandResponse2).getEntity(commandResponse2.getImplementedSchemaVersion()), commandResponse2.getDittoHeaders()));
        }).match(CommandResponse.class, commandResponse3 -> {
            return commandResponse3 instanceof WithOptionalEntity;
        }, commandResponse4 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, commandResponse4);
            this.logger.debug("Got 'CommandResponse' 'WithOptionalEntity' message");
            completeWithResult(createCommandResponse(httpRequest, commandResponse4, (WithOptionalEntity) commandResponse4));
        }).match(ErrorResponse.class, errorResponse -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, errorResponse);
            this.logger.info("Got 'ErrorResponse': {}", errorResponse);
            DittoRuntimeException dittoRuntimeException = errorResponse.getDittoRuntimeException();
            completeWithResult(HttpResponse.create().withStatus(dittoRuntimeException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException.toJsonString())));
        }).match(CommandResponse.class, commandResponse5 -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, commandResponse5);
            this.logger.warning("Got 'CommandResponse' message which did not implement the required interfaces 'WithEntity' / 'WithOptionalEntity': {}", commandResponse5);
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).match(Status.Failure.class, failure -> {
            return failure.cause() instanceof AskTimeoutException;
        }, failure2 -> {
            this.logger.warning("Got AskTimeoutException when a command response was expected: '{}'", failure2.cause().getMessage());
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).match(JsonRuntimeException.class, jsonRuntimeException -> {
            DittoJsonException dittoJsonException = new DittoJsonException(jsonRuntimeException);
            logDittoRuntimeException(dittoJsonException);
            completeWithResult(HttpResponse.create().withStatus(dittoJsonException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoJsonException.toJsonString())));
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            logDittoRuntimeException(dittoRuntimeException);
            completeWithResult(HttpResponse.create().withStatus(dittoRuntimeException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException.toJsonString())));
        }).match(ReceiveTimeout.class, receiveTimeout -> {
            this.logger.info("Got ReceiveTimeout when a response was expected: '{}'", receiveTimeout);
            MessageTimeoutException messageTimeoutException = new MessageTimeoutException(Long.valueOf(this.messageTimeout != null ? this.messageTimeout.getSeconds() : 0L));
            completeWithResult(HttpResponse.create().withStatus(messageTimeoutException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(messageTimeoutException.toJsonString())));
        }).match(Status.Failure.class, failure3 -> {
            return failure3.cause() instanceof AskTimeoutException;
        }, failure4 -> {
            this.logger.warning("Got AskTimeoutException when a command response was expected: '{}'", failure4.cause().getMessage());
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).match(Status.Failure.class, failure5 -> {
            return failure5.cause() instanceof DittoRuntimeException;
        }, failure6 -> {
            DittoRuntimeException dittoRuntimeException2 = (DittoRuntimeException) failure6.cause();
            logDittoRuntimeException(dittoRuntimeException2);
            completeWithResult(HttpResponse.create().withStatus(dittoRuntimeException2.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException2.toJsonString())));
        }).match(Status.Failure.class, failure7 -> {
            this.logger.error(failure7.cause().fillInStackTrace(), "Got Status.Failure when a command response was expected: '{}'", failure7.cause().getMessage());
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).matchEquals(ServerRequestTimeoutMessage.INSTANCE, serverRequestTimeoutMessage -> {
            handleServerRequestTimeout();
        }).matchAny(obj -> {
            this.logger.warning("Got unknown message, expected a command response: {}", obj);
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).build();
    }

    private static boolean hasPlainTextContentType(DittoHeaders dittoHeaders) {
        String name = DittoHeaderDefinition.CONTENT_TYPE.name();
        return dittoHeaders.containsKey(name) && HttpPostBodyUtil.DEFAULT_TEXT_CONTENT_TYPE.equalsIgnoreCase(dittoHeaders.get(name));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static HttpResponse addEntityAccordingToContentType(HttpResponse httpResponse, JsonValue jsonValue, DittoHeaders dittoHeaders) {
        return hasPlainTextContentType(dittoHeaders) ? httpResponse.withEntity(CONTENT_TYPE_TEXT, ByteString.fromString(jsonValue.asString())) : httpResponse.withEntity(CONTENT_TYPE_JSON, ByteString.fromString(jsonValue.toString()));
    }

    private static HttpResponse createCommandResponse(HttpRequest httpRequest, CommandResponse commandResponse, WithOptionalEntity withOptionalEntity) {
        return createHttpResponseWithHeadersAndBody(commandResponse, createModifiedLocationHeaderAddingResponseMapper(httpRequest, commandResponse), createBodyAddingResponseMapper(commandResponse, withOptionalEntity));
    }

    private static Function<HttpResponse, HttpResponse> createBodyAddingResponseMapper(CommandResponse commandResponse, WithOptionalEntity withOptionalEntity) {
        return httpResponse -> {
            return StatusCodes.NO_CONTENT.equals(httpResponse.status()) ? httpResponse : (HttpResponse) withOptionalEntity.getEntity(commandResponse.getImplementedSchemaVersion()).map(jsonValue -> {
                return addEntityAccordingToContentType(httpResponse, jsonValue, commandResponse.getDittoHeaders());
            }).orElse(httpResponse);
        };
    }

    private static Function<HttpResponse, HttpResponse> createModifiedLocationHeaderAddingResponseMapper(HttpRequest httpRequest, CommandResponse commandResponse) {
        return httpResponse -> {
            if (HttpStatusCode.CREATED != commandResponse.getStatusCode()) {
                return httpResponse;
            }
            Uri uri = httpRequest.getUri();
            if (!httpRequest.method().isIdempotent()) {
                String obj = uri.toString();
                int indexOf = obj.indexOf(commandResponse.getId());
                String str = indexOf > 0 ? obj.substring(0, indexOf) + commandResponse.getId() + commandResponse.getResourcePath().toString() : obj + "/" + commandResponse.getId() + commandResponse.getResourcePath().toString();
                if (str.endsWith("/")) {
                    str = str.substring(0, str.length() - 1);
                }
                uri = Uri.create(str);
            }
            return httpResponse.addHeader(Location.create(uri));
        };
    }

    private static HttpResponse createHttpResponseWithHeadersAndBody(CommandResponse commandResponse, Function<HttpResponse, HttpResponse> function, Function<HttpResponse, HttpResponse> function2) {
        return function2.apply(function.apply(HttpResponse.create().withStatus(commandResponse.getStatusCodeValue())));
    }

    private void logDittoRuntimeException(DittoRuntimeException dittoRuntimeException) {
        LogUtil.enhanceLogWithCorrelationId(this.logger, dittoRuntimeException);
        this.logger.info("DittoRuntimeException '{}': {}", dittoRuntimeException.getErrorCode(), dittoRuntimeException.getMessage());
    }

    public static Props props(final ActorRef actorRef, final HttpRequest httpRequest, final CompletableFuture<HttpResponse> completableFuture) {
        return Props.create(HttpRequestActor.class, new Creator<HttpRequestActor>() { // from class: org.eclipse.ditto.services.gateway.endpoints.HttpRequestActor.1
            private static final long serialVersionUID = 1;

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // akka.japi.Creator
            public HttpRequestActor create() {
                return new HttpRequestActor(ActorRef.this, httpRequest, completableFuture);
            }
        });
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void postStop() throws Exception {
        super.postStop();
        if (this.serverRequestTimeoutCancellable != null) {
            this.serverRequestTimeoutCancellable.cancel();
        }
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(MessageCommand.class, messageCommand -> {
            LogUtil.enhanceLogWithCorrelationId(this.logger, messageCommand);
            this.logger.info("Got <MessageCommand> with subject <{}>, telling the targetActor about it", messageCommand.getMessage().getSubject());
            Message message = messageCommand.getMessage();
            this.proxyActor.tell(messageCommand, getSelf());
            getContext().become(this.commandResponseAwaiting);
            this.messageTimeout = message.getTimeout().orElse(null);
            if (this.messageTimeout == null || FireAndForgetMessageUtil.isFireAndForgetMessage(messageCommand)) {
                return;
            }
            getContext().setReceiveTimeout(scala.concurrent.duration.Duration.apply(this.messageTimeout.getSeconds(), TimeUnit.SECONDS));
        }).match(Status.Failure.class, failure -> {
            Throwable cause = failure.cause();
            if (cause instanceof JsonRuntimeException) {
                cause = new DittoJsonException((JsonRuntimeException) cause);
            }
            if (!(cause instanceof DittoRuntimeException)) {
                this.logger.error(cause, "Got unknown Status.Failure when a 'Command' was expected");
                completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
            } else {
                DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) cause;
                logDittoRuntimeException(dittoRuntimeException);
                completeWithResult(HttpResponse.create().withStatus(dittoRuntimeException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException.toJsonString())));
            }
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            logDittoRuntimeException(dittoRuntimeException);
            completeWithResult(HttpResponse.create().withStatus(dittoRuntimeException.getStatusCode().toInt()).withEntity(CONTENT_TYPE_JSON, ByteString.fromString(dittoRuntimeException.toJsonString())));
        }).matchEquals(ServerRequestTimeoutMessage.INSTANCE, serverRequestTimeoutMessage -> {
            handleServerRequestTimeout();
        }).match(Command.class, command -> {
            this.logger.debug("Got 'Command' message, telling the targetActor about it");
            this.proxyActor.tell(command, getSelf());
            if (command.getDittoHeaders().isResponseRequired()) {
                getContext().become(this.commandResponseAwaiting);
            } else {
                completeWithResult(HttpResponse.create().withStatus(StatusCodes.ACCEPTED));
            }
        }).matchAny(obj -> {
            this.logger.warning("Got unknown message, expected a 'Command': {}", obj);
            completeWithResult(HttpResponse.create().withStatus(HttpStatusCode.INTERNAL_SERVER_ERROR.toInt()));
        }).build();
    }

    private HttpResponse handleMessageResponseMessage(MessageCommandResponse<?, ?> messageCommandResponse) {
        HttpResponse withStatus;
        Message<?> message = messageCommandResponse.getMessage();
        Optional<?> payload = message.getPayload();
        Optional<ByteBuffer> rawPayload = message.getRawPayload();
        Optional filter = Optional.of(messageCommandResponse.getStatusCode()).filter(httpStatusCode -> {
            return StatusCodes.lookup(httpStatusCode.toInt()).isPresent();
        }).filter(httpStatusCode2 -> {
            return !HttpStatusCode.BAD_GATEWAY.equals(httpStatusCode2);
        });
        if (((Boolean) filter.map(httpStatusCode3 -> {
            return Boolean.valueOf(httpStatusCode3 != HttpStatusCode.NO_CONTENT);
        }).orElse(true)).booleanValue()) {
            Optional<String> contentType = message.getContentType();
            ContentType$ contentType$ = ContentType$.MODULE$;
            contentType$.getClass();
            Optional map = contentType.map(contentType$::parse).filter((v0) -> {
                return v0.isRight();
            }).map((v0) -> {
                return v0.right();
            }).map((v0) -> {
                return v0.get();
            });
            withStatus = HttpResponse.create().withStatus(((HttpStatusCode) filter.orElse(HttpStatusCode.OK)).toInt());
            if (payload.isPresent()) {
                Object obj = payload.get();
                withStatus = map.isPresent() ? withStatus.withEntity2((RequestEntity) HttpEntities.create((ContentType) map.get(), ByteString.ByteStrings.fromString(obj.toString()))) : withStatus.withEntity2((RequestEntity) HttpEntities.create(obj.toString()));
            } else if (rawPayload.isPresent()) {
                ByteBuffer byteBuffer = rawPayload.get();
                withStatus = map.isPresent() ? withStatus.withEntity2((RequestEntity) HttpEntities.create((ContentType) map.get(), byteBuffer.array())) : withStatus.withEntity2((RequestEntity) HttpEntities.create(byteBuffer.array()));
            }
        } else {
            rawPayload.ifPresent(byteBuffer2 -> {
                this.logger.info("Response payload was set, but response statusCode was also set to: {}. Ignoring the response payload. Command=<{}>", filter, messageCommandResponse);
            });
            withStatus = HttpResponse.create().withStatus(((HttpStatusCode) filter.orElse(HttpStatusCode.NO_CONTENT)).toInt());
        }
        return withStatus;
    }

    private void handleServerRequestTimeout() {
        this.logger.warning("No response within server request timeout ({}), shutting actor down.", this.serverRequestTimeout);
        stop();
    }

    private void completeWithResult(HttpResponse httpResponse) {
        this.httpResponseFuture.complete(httpResponse);
        this.logger.debug("Responding with HttpResponse code '{}'", Integer.valueOf(httpResponse.status().intValue()));
        if (this.logger.isDebugEnabled()) {
            this.logger.debug("Responding with Entity: {}", httpResponse.entity());
        }
        stop();
    }

    private void stop() {
        this.logger.clearMDC();
        getContext().stop(getSelf());
    }
}
