package org.eclipse.ditto.gateway.service.endpoints.routes.websocket;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.event.Logging;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.WebSocketUpgrade;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.FanOutShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.KillSwitches;
import akka.stream.Materializer;
import akka.stream.SharedKillSwitch;
import akka.stream.SinkShape;
import akka.stream.UniformFanInShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoJsonException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.base.model.exceptions.TooManyRequestsException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.json.Jsonifiable;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.gateway.api.GatewayInternalErrorException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionAbortedException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.gateway.service.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.security.HttpHeader;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.StreamingActor;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamControlMessage;
import org.eclipse.ditto.gateway.service.streaming.signals.StreamingAck;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.WebsocketConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.akka.controlflow.Filter;
import org.eclipse.ditto.internal.utils.akka.controlflow.LimitRateByRejection;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.SpanTagKey;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.policies.model.PolicyException;
import org.eclipse.ditto.policies.model.signals.commands.PolicyErrorResponse;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.JsonifiableAdaptable;
import org.eclipse.ditto.protocol.ProtocolFactory;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.protocol.mappingstrategies.IllegalAdaptableException;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.ThingErrorResponse;
import org.eclipse.ditto.thingsearch.model.ThingSearchException;
import org.eclipse.ditto.thingsearch.model.signals.commands.SearchErrorResponse;
import org.slf4j.Logger;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute.class */
public final class WebSocketRoute implements WebSocketRouteBuilder {
    private static final String PROTOCOL_CMD_ACK_SUFFIX = ":ACK";
    private static final String STREAMING_TYPE_WS = "WS";
    private static final String BEARER = "Bearer";
    private static final String MDC_CONNECTION_CORRELATION_ID = "connection-correlation-id";
    private final SharedKillSwitch wsKillSwitch = KillSwitches.shared(WebSocketRoute.class.getSimpleName());
    private final ActorRef streamingActor;
    private final StreamingConfig streamingConfig;
    private final Materializer materializer;
    private IncomingWebSocketEventSniffer incomingMessageSniffer;
    private OutgoingWebSocketEventSniffer outgoingMessageSniffer;
    private StreamingAuthorizationEnforcer authorizationEnforcer;
    private WebSocketSupervisor webSocketSupervisor;

    @Nullable
    private GatewaySignalEnrichmentProvider signalEnrichmentProvider;
    private HeaderTranslator headerTranslator;
    private WebSocketConfigProvider webSocketConfigProvider;
    private static final ThreadSafeDittoLogger LOGGER = DittoLoggerFactory.getThreadSafeLogger(WebSocketRoute.class);
    private static final Duration LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);
    private static final String STREAMING_MESSAGES = "streaming_messages";
    private static final String TYPE = "type";
    private static final String WS = "ws";
    private static final String DIRECTION = "direction";
    private static final Counter IN_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "in");
    private static final Counter OUT_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "out");
    private static final Counter DROPPED_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "dropped");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRoute$1, reason: invalid class name */
    /* loaded from: input_file:org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType = new int[StreamingType.values().length];

        static {
            try {
                $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[StreamingType.EVENTS.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[StreamingType.MESSAGES.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[StreamingType.LIVE_COMMANDS.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[StreamingType.LIVE_EVENTS.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[StreamingType.POLICY_ANNOUNCEMENTS.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    private WebSocketRoute(ActorSystem actorSystem, ActorRef actorRef, StreamingConfig streamingConfig, Materializer materializer) {
        this.streamingActor = (ActorRef) ConditionChecker.checkNotNull(actorRef, "streamingActor");
        this.streamingConfig = streamingConfig;
        Config config = actorSystem.settings().config();
        Config dittoExtension = ScopedConfig.dittoExtension(config);
        this.incomingMessageSniffer = IncomingWebSocketEventSniffer.get(actorSystem, dittoExtension);
        this.outgoingMessageSniffer = OutgoingWebSocketEventSniffer.get(actorSystem, dittoExtension);
        this.authorizationEnforcer = StreamingAuthorizationEnforcer.get(actorSystem, ScopedConfig.getOrEmpty(config, "ditto.gateway.streaming.websocket"));
        this.webSocketSupervisor = WebSocketSupervisor.get(actorSystem, dittoExtension);
        this.webSocketConfigProvider = WebSocketConfigProvider.get(actorSystem, dittoExtension);
        this.signalEnrichmentProvider = null;
        this.headerTranslator = HeaderTranslator.empty();
        this.materializer = materializer;
    }

    public static WebSocketRoute getInstance(ActorSystem actorSystem, ActorRef actorRef, StreamingConfig streamingConfig, Materializer materializer) {
        return new WebSocketRoute(actorSystem, actorRef, streamingConfig, materializer);
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withIncomingEventSniffer(IncomingWebSocketEventSniffer incomingWebSocketEventSniffer) {
        this.incomingMessageSniffer = (IncomingWebSocketEventSniffer) ConditionChecker.checkNotNull(incomingWebSocketEventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withOutgoingEventSniffer(OutgoingWebSocketEventSniffer outgoingWebSocketEventSniffer) {
        this.outgoingMessageSniffer = (OutgoingWebSocketEventSniffer) ConditionChecker.checkNotNull(outgoingWebSocketEventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withAuthorizationEnforcer(StreamingAuthorizationEnforcer streamingAuthorizationEnforcer) {
        this.authorizationEnforcer = (StreamingAuthorizationEnforcer) ConditionChecker.checkNotNull(streamingAuthorizationEnforcer, "enforcer");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withWebSocketSupervisor(WebSocketSupervisor webSocketSupervisor) {
        this.webSocketSupervisor = (WebSocketSupervisor) ConditionChecker.checkNotNull(webSocketSupervisor, "webSocketSupervisor");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withSignalEnrichmentProvider(@Nullable GatewaySignalEnrichmentProvider gatewaySignalEnrichmentProvider) {
        this.signalEnrichmentProvider = gatewaySignalEnrichmentProvider;
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withHeaderTranslator(HeaderTranslator headerTranslator) {
        this.headerTranslator = (HeaderTranslator) ConditionChecker.checkNotNull(headerTranslator, "headerTranslator");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withWebSocketConfigProvider(WebSocketConfigProvider webSocketConfigProvider) {
        this.webSocketConfigProvider = (WebSocketConfigProvider) ConditionChecker.checkNotNull(webSocketConfigProvider, "webSocketConfigProvider");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.websocket.WebSocketRouteBuilder
    public Route build(JsonSchemaVersion jsonSchemaVersion, CharSequence charSequence, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, RequestContext requestContext) {
        return Directives.extractWebSocketUpgrade(webSocketUpgrade -> {
            return Directives.extractRequest(httpRequest -> {
                return Directives.completeWithFuture(this.authorizationEnforcer.checkAuthorization(requestContext, dittoHeaders).thenCompose(dittoHeaders2 -> {
                    return createWebSocket(webSocketUpgrade, jsonSchemaVersion, charSequence.toString(), dittoHeaders2, protocolAdapter, httpRequest);
                }));
            });
        });
    }

    private CompletionStage<WebsocketConfig> retrieveWebsocketConfig() {
        CompletionStage ask = Patterns.ask(this.streamingActor, StreamingActor.Control.RETRIEVE_WEBSOCKET_CONFIG, LOCAL_ASK_TIMEOUT);
        Class<WebsocketConfig> cls = WebsocketConfig.class;
        Objects.requireNonNull(WebsocketConfig.class);
        return ask.thenApply(cls::cast);
    }

    private CompletionStage<HttpResponse> createWebSocket(WebSocketUpgrade webSocketUpgrade, JsonSchemaVersion jsonSchemaVersion, CharSequence charSequence, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest) {
        CompletionStage<SignalEnrichmentFacade> completedStage = this.signalEnrichmentProvider == null ? CompletableFuture.completedStage(null) : this.signalEnrichmentProvider.getFacade(httpRequest);
        AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
        ThreadSafeDittoLogger withMdcEntry = LOGGER.withMdcEntry(MDC_CONNECTION_CORRELATION_ID, charSequence);
        withMdcEntry.info("Creating WebSocket for connection authContext: <{}>", authorizationContext);
        return completedStage.thenCompose(signalEnrichmentFacade -> {
            return retrieveWebsocketConfig().thenApply(overwriteWebSocketConfig(dittoHeaders)).thenApply(websocketConfig -> {
                Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgoing = createOutgoing(jsonSchemaVersion, charSequence, authorizationContext, dittoHeaders, protocolAdapter, httpRequest, websocketConfig, signalEnrichmentFacade, withMdcEntry);
                return webSocketUpgrade.handleMessagesWith(createIncoming(jsonSchemaVersion, charSequence, authorizationContext, dittoHeaders, protocolAdapter, httpRequest, websocketConfig, (Connect) createOutgoing.first(), withMdcEntry).via(this.wsKillSwitch.flow()).via((Graph) createOutgoing.second()));
            });
        });
    }

    private Function<WebsocketConfig, WebsocketConfig> overwriteWebSocketConfig(DittoHeaders dittoHeaders) {
        return websocketConfig -> {
            return this.webSocketConfigProvider.apply(dittoHeaders, websocketConfig);
        };
    }

    private Flow<Message, DittoRuntimeException, NotUsed> createIncoming(JsonSchemaVersion jsonSchemaVersion, CharSequence charSequence, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest, WebsocketConfig websocketConfig, Connect connect, ThreadSafeDittoLogger threadSafeDittoLogger) {
        return Flow.fromGraph(GraphDSL.create(builder -> {
            FlowShape add = builder.add(getStrictifyFlow(httpRequest, threadSafeDittoLogger).via(AbstractRoute.throttleByConfig(websocketConfig.getThrottlingConfig())));
            FanOutShape2 add2 = builder.add(selectStreamControlOrSignal(jsonSchemaVersion, charSequence, authorizationContext, dittoHeaders, protocolAdapter, threadSafeDittoLogger));
            FanOutShape2 add3 = builder.add(getRateLimiter(websocketConfig));
            FlowShape add4 = builder.add(Flow.fromFunction(dittoRuntimeException -> {
                DROPPED_COUNTER.increment();
                return dittoRuntimeException;
            }));
            SinkShape add5 = builder.add(getStreamControlOrSignalSink(connect));
            UniformFanInShape add6 = builder.add(Merge.create(2, true));
            builder.from(add.out()).toInlet(add2.in());
            builder.from(add2.out0()).toInlet(add3.in());
            builder.from(add2.out1()).toFanIn(add6);
            builder.from(add3.out0()).to(add5);
            builder.from(add3.out1()).via(add4).toFanIn(add6);
            return FlowShape.of(add.in(), add6.out());
        }));
    }

    private Graph<SinkShape<Either<StreamControlMessage, Signal<?>>>, ?> getStreamControlOrSignalSink(Connect connect) {
        Flow fromFunction = Flow.fromFunction(either -> {
            Either map = either.right().map(IncomingSignal::of);
            Either.LeftProjection left = either.left();
            Objects.requireNonNull(left);
            return map.getOrElse(left::get);
        });
        Source completionStageSource = Source.completionStageSource(Patterns.ask(this.streamingActor, connect, LOCAL_ASK_TIMEOUT).thenApply(obj -> {
            return Source.repeat((ActorRef) obj);
        }));
        NoOp noOp = NoOp.getInstance();
        return fromFunction.zipWith(completionStageSource, (v0, v1) -> {
            return Pair.create(v0, v1);
        }).to(Sink.foreach(pair -> {
            ActorRef actorRef = (ActorRef) pair.second();
            Object first = pair.first();
            if (noOp.equals(first)) {
                return;
            }
            actorRef.tell(first, ActorRef.noSender());
        }));
    }

    private Flow<Message, String, NotUsed> getStrictifyFlow(HttpRequest httpRequest, Logger logger) {
        return Flow.create().via(Flow.fromFunction(message -> {
            IN_COUNTER.increment();
            return message;
        })).filter((v0) -> {
            return v0.isText();
        }).map((v0) -> {
            return v0.asTextMessage();
        }).map(textMessage -> {
            return textMessage.isStrict() ? Source.single(textMessage.getStrictText()) : textMessage.getStreamedText();
        }).flatMapConcat(source -> {
            return source.fold("", (str, str2) -> {
                return str + str2;
            });
        }).via(this.incomingMessageSniffer.toAsyncFlow(httpRequest)).via(Flow.fromFunction(str -> {
            logger.debug("Received incoming WebSocket message: {}", str);
            return str;
        })).withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(), Logging.WarningLevel()));
    }

    private Graph<FanOutShape2<String, Either<StreamControlMessage, Signal<?>>, DittoRuntimeException>, NotUsed> selectStreamControlOrSignal(JsonSchemaVersion jsonSchemaVersion, CharSequence charSequence, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, ThreadSafeDittoLogger threadSafeDittoLogger) {
        ProtocolMessageExtractor protocolMessageExtractor = new ProtocolMessageExtractor(authorizationContext, charSequence);
        return Filter.multiplexByEither(str -> {
            Right apply;
            try {
                Optional<StreamControlMessage> apply2 = protocolMessageExtractor.apply(str);
                if (apply2.isPresent()) {
                    apply = Right.apply(Left.apply(apply2.get()));
                } else {
                    DittoHeaders initialInternalHeaders = getInitialInternalHeaders(jsonSchemaVersion, authorizationContext, charSequence);
                    try {
                        Signal<?> buildSignal = buildSignal(charSequence, initialInternalHeaders, getJsonifiableAdaptableOrThrow(str, initialInternalHeaders), dittoHeaders, protocolAdapter, this.headerTranslator, threadSafeDittoLogger);
                        StartedSpan start = DittoTracing.newPreparedSpan(buildSignal.getDittoHeaders(), SpanOperationName.of("gw_streaming_in_signal")).tag(SpanTagKey.SIGNAL_TYPE.getTagForValue(buildSignal.getType())).start();
                        apply = Right.apply(Right.apply(buildSignal.setDittoHeaders(DittoHeaders.of(start.propagateContext(buildSignal.getDittoHeaders())))));
                        start.finish();
                    } catch (Exception e) {
                        Objects.requireNonNull(threadSafeDittoLogger);
                        logSignalBuildingFailure(threadSafeDittoLogger::warn, e, str);
                        apply = Left.apply(traceSignalBuildingFailure(GatewayInternalErrorException.newBuilder().message(e.getMessage()).cause(e).build()));
                    } catch (IllegalAdaptableException e2) {
                        ThreadSafeDittoLogger withCorrelationId = threadSafeDittoLogger.withCorrelationId(e2);
                        Objects.requireNonNull(withCorrelationId);
                        logSignalBuildingFailure(withCorrelationId::info, e2, str);
                        apply = isResponseRequired(e2) ? Left.apply(traceSignalBuildingFailure(e2.setDittoHeaders(DittoHeaders.newBuilder(e2.getDittoHeaders()).origin(charSequence).build()))) : Right.apply(Left.apply(NoOp.getInstance()));
                    } catch (DittoRuntimeException e3) {
                        ThreadSafeDittoLogger withCorrelationId2 = threadSafeDittoLogger.withCorrelationId(e3);
                        Objects.requireNonNull(withCorrelationId2);
                        logSignalBuildingFailure(withCorrelationId2::debug, e3, str);
                        apply = Left.apply(traceSignalBuildingFailure(e3));
                    }
                }
                return apply;
            } catch (DittoRuntimeException e4) {
                return Left.apply(e4);
            }
        });
    }

    private static void logSignalBuildingFailure(BiConsumer<String, Object[]> biConsumer, Exception exc, String str) {
        biConsumer.accept("Failed to build a Signal from <{}>; {}: {}", new Object[]{str, exc.getClass().getSimpleName(), exc.getMessage()});
    }

    private static DittoRuntimeException traceSignalBuildingFailure(DittoRuntimeException dittoRuntimeException) {
        StartedSpan startTraceSpan = startTraceSpan(dittoRuntimeException, SpanOperationName.of("gw.streaming.in.error"));
        startTraceSpan.tagAsFailed(dittoRuntimeException);
        try {
            return dittoRuntimeException.setDittoHeaders(DittoHeaders.of(startTraceSpan.propagateContext(dittoRuntimeException.getDittoHeaders())));
        } finally {
            startTraceSpan.finish();
        }
    }

    private static StartedSpan startTraceSpan(WithDittoHeaders withDittoHeaders, SpanOperationName spanOperationName) {
        return DittoTracing.newPreparedSpan(withDittoHeaders.getDittoHeaders(), spanOperationName).start();
    }

    private Pair<Connect, Flow<DittoRuntimeException, Message, NotUsed>> createOutgoing(JsonSchemaVersion jsonSchemaVersion, CharSequence charSequence, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest, WebsocketConfig websocketConfig, @Nullable SignalEnrichmentFacade signalEnrichmentFacade, ThreadSafeDittoLogger threadSafeDittoLogger) {
        Optional<JsonWebToken> extractJwtFromRequestIfPresent = extractJwtFromRequestIfPresent(httpRequest);
        Pair preMaterialize = SupervisedStream.sourceQueue(websocketConfig.getPublisherBackpressureBufferSize()).mapMaterializedValue(withQueue -> {
            this.webSocketSupervisor.supervise(withQueue.getSupervisedStream(), charSequence, dittoHeaders);
            return new Connect(withQueue.getSourceQueue(), charSequence, STREAMING_TYPE_WS, jsonSchemaVersion, (Instant) extractJwtFromRequestIfPresent.map((v0) -> {
                return v0.getExpirationTime();
            }).orElse(null), readDeclaredAcknowledgementLabels(dittoHeaders), authorizationContext, this.wsKillSwitch);
        }).recoverWithRetries(1, new PFBuilder().match(GatewayWebsocketSessionAbortedException.class, gatewayWebsocketSessionAbortedException -> {
            threadSafeDittoLogger.info("WebSocket connection aborted because of service restart!");
            return Source.empty();
        }).match(GatewayWebsocketSessionExpiredException.class, gatewayWebsocketSessionExpiredException -> {
            threadSafeDittoLogger.info("WebSocket connection terminated because JWT expired!");
            return Source.empty();
        }).match(GatewayWebsocketSessionClosedException.class, gatewayWebsocketSessionClosedException -> {
            threadSafeDittoLogger.info("WebSocket connection terminated because authorization context changed!");
            return Source.empty();
        }).match(DittoRuntimeException.class, dittoRuntimeException -> {
            return Source.single(SessionedJsonifiable.error(dittoRuntimeException));
        }).build()).preMaterialize(this.materializer);
        return Pair.create((Connect) preMaterialize.first(), joinOutgoingFlows((Source) preMaterialize.second(), Flow.fromFunction(SessionedJsonifiable::error), Flow.create().mapAsync(this.streamingConfig.getParallelism(), postprocess(protocolAdapter, signalEnrichmentFacade, threadSafeDittoLogger)).mapConcat(collection -> {
            return collection;
        }).via(Flow.fromFunction(str -> {
            threadSafeDittoLogger.debug("Sending outgoing WebSocket message: {}", str);
            return str;
        })).via(this.outgoingMessageSniffer.toAsyncFlow(httpRequest)).map(TextMessage::create).via(Flow.fromFunction(message -> {
            OUT_COUNTER.increment();
            return message;
        }))));
    }

    private static Set<AcknowledgementLabel> readDeclaredAcknowledgementLabels(DittoHeaders dittoHeaders) {
        return (Set) ((Stream) Optional.ofNullable((String) dittoHeaders.get(DittoHeaderDefinition.DECLARED_ACKS.getKey())).map(JsonFactory::readFrom).filter((v0) -> {
            return v0.isArray();
        }).map((v0) -> {
            return v0.asArray();
        }).map((v0) -> {
            return v0.stream();
        }).orElseGet(Stream::empty)).filter((v0) -> {
            return v0.isString();
        }).map((v0) -> {
            return v0.asString();
        }).map((v0) -> {
            return AcknowledgementLabel.of(v0);
        }).collect(Collectors.toSet());
    }

    private static <T> Flow<DittoRuntimeException, Message, NotUsed> joinOutgoingFlows(Source<T, NotUsed> source, Flow<DittoRuntimeException, T, NotUsed> flow, Flow<T, Message, NotUsed> flow2) {
        return Flow.fromGraph(GraphDSL.create3(source, flow, flow2, (notUsed, notUsed2, notUsed3) -> {
            return notUsed;
        }, (builder, sourceShape, flowShape, flowShape2) -> {
            UniformFanInShape add = builder.add(Merge.create(2, true));
            builder.from(sourceShape).toFanIn(add);
            builder.from(flowShape).toFanIn(add);
            builder.from(add.out()).toInlet(flowShape2.in());
            return FlowShape.of(flowShape.in(), flowShape2.out());
        }));
    }

    private static <T> Graph<FanOutShape2<Either<T, Signal<?>>, Either<T, Signal<?>>, DittoRuntimeException>, NotUsed> getRateLimiter(WebsocketConfig websocketConfig) {
        Duration interval = websocketConfig.getThrottlingConfig().getInterval();
        int limit = websocketConfig.getThrottlingConfig().getLimit();
        return websocketConfig.getThrottlingConfig().isEnabled() ? LimitRateByRejection.of(interval, Math.max(limit, (int) (limit * websocketConfig.getThrottlingRejectionFactor())), either -> {
            TooManyRequestsException.Builder retryAfter = TooManyRequestsException.newBuilder().retryAfter(interval);
            if (either.isRight()) {
                retryAfter.dittoHeaders(((Signal) either.right().get()).getDittoHeaders());
            }
            return retryAfter.build();
        }) : Filter.multiplexByEither((v0) -> {
            return Right.apply(v0);
        });
    }

    private static DittoHeaders getInitialInternalHeaders(JsonSchemaVersion jsonSchemaVersion, AuthorizationContext authorizationContext, CharSequence charSequence) {
        return DittoHeaders.newBuilder().schemaVersion(jsonSchemaVersion).authorizationContext(authorizationContext).correlationId(charSequence).origin(charSequence).build();
    }

    private static JsonifiableAdaptable getJsonifiableAdaptableOrThrow(String str, DittoHeaders dittoHeaders) {
        if (str.isEmpty()) {
            throw new DittoJsonException(new IllegalArgumentException("Empty json."), dittoHeaders);
        }
        return (JsonifiableAdaptable) DittoJsonException.wrapJsonRuntimeException(str, DittoHeaders.empty(), (str2, dittoHeaders2) -> {
            return ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(str2));
        });
    }

    private static Signal<?> buildSignal(CharSequence charSequence, DittoHeaders dittoHeaders, Adaptable adaptable, DittoHeaders dittoHeaders2, ProtocolAdapter protocolAdapter, HeaderTranslator headerTranslator, ThreadSafeDittoLogger threadSafeDittoLogger) {
        try {
            Signal fromAdaptable = protocolAdapter.fromAdaptable(adaptable);
            ThreadSafeDittoLogger withCorrelationId = threadSafeDittoLogger.withCorrelationId(fromAdaptable);
            withCorrelationId.debug("WebSocket message has been converted to signal <{}>.", fromAdaptable);
            DittoHeaders dittoHeaders3 = fromAdaptable.getDittoHeaders();
            withCorrelationId.trace("Adding initialInternalHeaders: <{}>.", dittoHeaders);
            DittoHeadersBuilder newBuilder = DittoHeaders.newBuilder(dittoHeaders);
            Map retainKnownHeaders = headerTranslator.retainKnownHeaders(dittoHeaders2);
            withCorrelationId.trace("Adding wellKnownAdditionalHeaders: <{}>.", retainKnownHeaders);
            newBuilder.putHeaders(retainKnownHeaders);
            withCorrelationId.trace("Adding signalHeaders: <{}>.", dittoHeaders3);
            newBuilder.putHeaders(dittoHeaders3);
            if (dittoHeaders3.getCorrelationId().isEmpty()) {
                String uuid = UUID.randomUUID().toString();
                withCorrelationId.trace("Adding generated correlationId: <{}>.", uuid);
                newBuilder.correlationId(uuid);
            }
            withCorrelationId.debug("Generated internalHeaders are: <{}>.", newBuilder);
            return fromAdaptable.setDittoHeaders(newBuilder.build());
        } catch (DittoRuntimeException e) {
            throw e.setDittoHeaders(e.getDittoHeaders().toBuilder().origin(charSequence).build());
        }
    }

    private static boolean isResponseRequired(WithDittoHeaders withDittoHeaders) {
        return withDittoHeaders.getDittoHeaders().isResponseRequired();
    }

    private akka.japi.function.Function<SessionedJsonifiable, CompletionStage<Collection<String>>> postprocess(ProtocolAdapter protocolAdapter, @Nullable SignalEnrichmentFacade signalEnrichmentFacade, ThreadSafeDittoLogger threadSafeDittoLogger) {
        return sessionedJsonifiable -> {
            Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable = sessionedJsonifiable.getJsonifiable();
            if (jsonifiable instanceof StreamingAck) {
                return CompletableFuture.completedFuture(Collections.singletonList(streamingAckToString((StreamingAck) jsonifiable)));
            }
            Adaptable jsonifiableToAdaptable = jsonifiableToAdaptable(jsonifiable, protocolAdapter);
            return sessionedJsonifiable.retrieveExtraFields(signalEnrichmentFacade).thenApply(jsonObject -> {
                if (matchesFilter(sessionedJsonifiable, jsonObject)) {
                    return Collections.singletonList(toJsonStringWithExtra(jsonifiableToAdaptable, jsonObject));
                }
                issuePotentialWeakAcknowledgements(sessionedJsonifiable);
                sessionedJsonifiable.finishSpan();
                return Collections.emptyList();
            }).exceptionally(th -> {
                sessionedJsonifiable.finishSpan();
                return reportEnrichmentError(th, protocolAdapter, jsonifiableToAdaptable, threadSafeDittoLogger);
            });
        };
    }

    private void issuePotentialWeakAcknowledgements(SessionedJsonifiable sessionedJsonifiable) {
        sessionedJsonifiable.getSession().ifPresent(streamingSession -> {
            DittoHeaders dittoHeaders = sessionedJsonifiable.getDittoHeaders();
            Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable = sessionedJsonifiable.getJsonifiable();
            ActorRef streamingSessionActor = streamingSession.getStreamingSessionActor();
            WithEntityId.getEntityIdOfType(EntityId.class, jsonifiable).ifPresent(entityId -> {
                dittoHeaders.getAcknowledgementRequests().stream().map(acknowledgementRequest -> {
                    return weakAck(acknowledgementRequest.getLabel(), entityId, dittoHeaders);
                }).map((v0) -> {
                    return IncomingSignal.of(v0);
                }).forEach(incomingSignal -> {
                    streamingSessionActor.tell(incomingSignal, ActorRef.noSender());
                });
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Acknowledgement weakAck(AcknowledgementLabel acknowledgementLabel, EntityId entityId, DittoHeaders dittoHeaders) {
        return Acknowledgement.weak(acknowledgementLabel, entityId, dittoHeaders, JsonValue.of("Acknowledgement was issued automatically as weak ack, because the signal is not relevant for the subscriber. Possible reasons are: the subscriber did not subscribe for the signal type, or the signal was dropped by a configured RQL filter."));
    }

    private static Collection<String> reportEnrichmentError(Throwable th, ProtocolAdapter protocolAdapter, Adaptable adaptable, ThreadSafeDittoLogger threadSafeDittoLogger) {
        DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th2 -> {
            return SignalEnrichmentFailedException.newBuilder().dittoHeaders(adaptable.getDittoHeaders()).cause(th2).build();
        });
        threadSafeDittoLogger.withCorrelationId(adaptable.getDittoHeaders()).error("Signal enrichment failed due to: {}", th.getMessage(), asDittoRuntimeException);
        return Collections.singletonList(ProtocolFactory.wrapAsJsonifiableAdaptable(protocolAdapter.toAdaptable(ThingErrorResponse.of(ThingId.of(adaptable.getTopicPath().getNamespace(), adaptable.getTopicPath().getEntityName()), asDittoRuntimeException, adaptable.getDittoHeaders()))).toJsonString());
    }

    private static String toJsonStringWithExtra(Adaptable adaptable, JsonObject jsonObject) {
        return ProtocolFactory.wrapAsJsonifiableAdaptable(jsonObject.isEmpty() ? adaptable : ProtocolFactory.setExtra(adaptable, jsonObject)).toJsonString();
    }

    private static boolean matchesFilter(SessionedJsonifiable sessionedJsonifiable, JsonObject jsonObject) {
        Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable = sessionedJsonifiable.getJsonifiable();
        return ((Boolean) sessionedJsonifiable.getSession().filter(streamingSession -> {
            return jsonifiable instanceof Signal;
        }).map(streamingSession2 -> {
            Signal<?> signal = (Signal) jsonifiable;
            return Boolean.valueOf(streamingSession2.matchesFilter(streamingSession2.mergeThingWithExtra(signal, jsonObject), signal));
        }).orElse(true)).booleanValue();
    }

    private static String streamingAckToString(StreamingAck streamingAck) {
        String protocolMessageType;
        StreamingType streamingType = streamingAck.getStreamingType();
        boolean isSubscribed = streamingAck.isSubscribed();
        switch (AnonymousClass1.$SwitchMap$org$eclipse$ditto$internal$utils$pubsub$StreamingType[streamingType.ordinal()]) {
            case 1:
                protocolMessageType = isSubscribed ? ProtocolMessageType.START_SEND_EVENTS.toString() : ProtocolMessageType.STOP_SEND_EVENTS.toString();
                break;
            case 2:
                protocolMessageType = isSubscribed ? ProtocolMessageType.START_SEND_MESSAGES.toString() : ProtocolMessageType.STOP_SEND_MESSAGES.toString();
                break;
            case 3:
                protocolMessageType = isSubscribed ? ProtocolMessageType.START_SEND_LIVE_COMMANDS.toString() : ProtocolMessageType.STOP_SEND_LIVE_COMMANDS.toString();
                break;
            case 4:
                protocolMessageType = isSubscribed ? ProtocolMessageType.START_SEND_LIVE_EVENTS.toString() : ProtocolMessageType.STOP_SEND_LIVE_EVENTS.toString();
                break;
            case 5:
                protocolMessageType = isSubscribed ? ProtocolMessageType.START_SEND_POLICY_ANNOUNCEMENTS.toString() : ProtocolMessageType.STOP_SEND_POLICY_ANNOUNCEMENTS.toString();
                break;
            default:
                throw new IllegalArgumentException("Unknown streamingType: " + streamingType);
        }
        return protocolMessageType + ":ACK";
    }

    private static Adaptable jsonifiableToAdaptable(Jsonifiable.WithPredicate<JsonObject, JsonField> withPredicate, ProtocolAdapter protocolAdapter) {
        Adaptable adaptable;
        if (withPredicate instanceof Signal) {
            adaptable = protocolAdapter.toAdaptable((Signal) withPredicate);
        } else {
            if (!(withPredicate instanceof DittoRuntimeException)) {
                throw new IllegalArgumentException("Jsonifiable was neither Signal nor DittoRuntimeException: " + withPredicate.getClass().getSimpleName());
            }
            DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) withPredicate;
            adaptable = protocolAdapter.toAdaptable(withPredicate instanceof PolicyException ? buildPolicyErrorResponse(dittoRuntimeException) : withPredicate instanceof ThingSearchException ? buildSearchErrorResponse(dittoRuntimeException) : buildThingErrorResponse(dittoRuntimeException));
        }
        return adaptable;
    }

    private static ThingErrorResponse buildThingErrorResponse(DittoRuntimeException dittoRuntimeException) {
        return ThingErrorResponse.of(dittoRuntimeException);
    }

    private static PolicyErrorResponse buildPolicyErrorResponse(DittoRuntimeException dittoRuntimeException) {
        return PolicyErrorResponse.of(dittoRuntimeException);
    }

    private static SearchErrorResponse buildSearchErrorResponse(DittoRuntimeException dittoRuntimeException) {
        return SearchErrorResponse.of(dittoRuntimeException, dittoRuntimeException.getDittoHeaders());
    }

    private static Optional<JsonWebToken> extractJwtFromRequestIfPresent(HttpRequest httpRequest) {
        return httpRequest.getHeader(HttpHeader.AUTHORIZATION.toString()).map((v0) -> {
            return v0.value();
        }).filter(str -> {
            return str.startsWith(BEARER);
        }).map(ImmutableJsonWebToken::fromAuthorization);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1654325644:
                if (implMethodName.equals("lambda$createOutgoing$4ac264b0$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1654325643:
                if (implMethodName.equals("lambda$createOutgoing$4ac264b0$2")) {
                    z = 9;
                    break;
                }
                break;
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = 13;
                    break;
                }
                break;
            case -1208343013:
                if (implMethodName.equals("lambda$getStrictifyFlow$599af9b5$1")) {
                    z = true;
                    break;
                }
                break;
            case -1180098185:
                if (implMethodName.equals("isText")) {
                    z = 16;
                    break;
                }
                break;
            case -1072991476:
                if (implMethodName.equals("lambda$createIncoming$1c70e015$1")) {
                    z = 3;
                    break;
                }
                break;
            case -910844886:
                if (implMethodName.equals("lambda$postprocess$51c0305$1")) {
                    z = 19;
                    break;
                }
                break;
            case -484855097:
                if (implMethodName.equals("lambda$getStrictifyFlow$57522b5d$1")) {
                    z = 18;
                    break;
                }
                break;
            case -477553727:
                if (implMethodName.equals("lambda$getStreamControlOrSignalSink$cc1452e7$1")) {
                    z = 5;
                    break;
                }
                break;
            case 96784904:
                if (implMethodName.equals("error")) {
                    z = 6;
                    break;
                }
                break;
            case 362457667:
                if (implMethodName.equals("lambda$getStreamControlOrSignalSink$7f1ee248$1")) {
                    z = 12;
                    break;
                }
                break;
            case 518132984:
                if (implMethodName.equals("lambda$createOutgoing$3e933377$1")) {
                    z = 14;
                    break;
                }
                break;
            case 1052180104:
                if (implMethodName.equals("asTextMessage")) {
                    z = 17;
                    break;
                }
                break;
            case 1141513213:
                if (implMethodName.equals("lambda$joinOutgoingFlows$432d1707$1")) {
                    z = 11;
                    break;
                }
                break;
            case 1142169576:
                if (implMethodName.equals("lambda$joinOutgoingFlows$432d16e8$1")) {
                    z = 15;
                    break;
                }
                break;
            case 1794853674:
                if (implMethodName.equals("lambda$createOutgoing$c608e30e$1")) {
                    z = 8;
                    break;
                }
                break;
            case 1861833324:
                if (implMethodName.equals("lambda$createIncoming$51fdf62b$1")) {
                    z = 10;
                    break;
                }
                break;
            case 2070203875:
                if (implMethodName.equals("lambda$getStrictifyFlow$8704ffb9$1")) {
                    z = 4;
                    break;
                }
                break;
            case 2070203876:
                if (implMethodName.equals("lambda$getStrictifyFlow$8704ffb9$2")) {
                    z = false;
                    break;
                }
                break;
            case 2070203877:
                if (implMethodName.equals("lambda$getStrictifyFlow$8704ffb9$3")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/TextMessage;)Lakka/stream/javadsl/Source;")) {
                    return textMessage -> {
                        return textMessage.isStrict() ? Source.single(textMessage.getStrictText()) : textMessage.getStreamedText();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;")) {
                    return (str, str2) -> {
                        return str + str2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/Graph;")) {
                    return source -> {
                        return source.fold("", (str3, str22) -> {
                            return str3 + str22;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/HttpRequest;Lorg/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger;Lorg/eclipse/ditto/gateway/service/util/config/streaming/WebsocketConfig;Lorg/eclipse/ditto/base/model/json/JsonSchemaVersion;Ljava/lang/CharSequence;Lorg/eclipse/ditto/base/model/auth/AuthorizationContext;Lorg/eclipse/ditto/base/model/headers/DittoHeaders;Lorg/eclipse/ditto/protocol/adapter/ProtocolAdapter;Lorg/eclipse/ditto/gateway/service/streaming/signals/Connect;Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/FlowShape;")) {
                    WebSocketRoute webSocketRoute = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    HttpRequest httpRequest = (HttpRequest) serializedLambda.getCapturedArg(1);
                    ThreadSafeDittoLogger threadSafeDittoLogger = (ThreadSafeDittoLogger) serializedLambda.getCapturedArg(2);
                    WebsocketConfig websocketConfig = (WebsocketConfig) serializedLambda.getCapturedArg(3);
                    JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) serializedLambda.getCapturedArg(4);
                    CharSequence charSequence = (CharSequence) serializedLambda.getCapturedArg(5);
                    AuthorizationContext authorizationContext = (AuthorizationContext) serializedLambda.getCapturedArg(6);
                    DittoHeaders dittoHeaders = (DittoHeaders) serializedLambda.getCapturedArg(7);
                    ProtocolAdapter protocolAdapter = (ProtocolAdapter) serializedLambda.getCapturedArg(8);
                    Connect connect = (Connect) serializedLambda.getCapturedArg(9);
                    return builder -> {
                        FlowShape add = builder.add(getStrictifyFlow(httpRequest, threadSafeDittoLogger).via(AbstractRoute.throttleByConfig(websocketConfig.getThrottlingConfig())));
                        FanOutShape2 add2 = builder.add(selectStreamControlOrSignal(jsonSchemaVersion, charSequence, authorizationContext, dittoHeaders, protocolAdapter, threadSafeDittoLogger));
                        FanOutShape2 add3 = builder.add(getRateLimiter(websocketConfig));
                        FlowShape add4 = builder.add(Flow.fromFunction(dittoRuntimeException -> {
                            DROPPED_COUNTER.increment();
                            return dittoRuntimeException;
                        }));
                        SinkShape add5 = builder.add(getStreamControlOrSignalSink(connect));
                        UniformFanInShape add6 = builder.add(Merge.create(2, true));
                        builder.from(add.out()).toInlet(add2.in());
                        builder.from(add2.out0()).toInlet(add3.in());
                        builder.from(add2.out1()).toFanIn(add6);
                        builder.from(add3.out0()).to(add5);
                        builder.from(add3.out1()).via(add4).toFanIn(add6);
                        return FlowShape.of(add.in(), add6.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/Message;)Lakka/http/javadsl/model/ws/Message;")) {
                    return message -> {
                        IN_COUNTER.increment();
                        return message;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/gateway/service/endpoints/routes/websocket/NoOp;Lakka/japi/Pair;)V")) {
                    NoOp noOp = (NoOp) serializedLambda.getCapturedArg(0);
                    return pair -> {
                        ActorRef actorRef = (ActorRef) pair.second();
                        Object first = pair.first();
                        if (noOp.equals(first)) {
                            return;
                        }
                        actorRef.tell(first, ActorRef.noSender());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/base/model/exceptions/DittoRuntimeException;)Lorg/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable;")) {
                    return SessionedJsonifiable::error;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Collection;)Ljava/lang/Iterable;")) {
                    return collection -> {
                        return collection;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/CharSequence;Lorg/eclipse/ditto/base/model/headers/DittoHeaders;Lorg/eclipse/ditto/base/model/json/JsonSchemaVersion;Ljava/util/Optional;Lorg/eclipse/ditto/base/model/auth/AuthorizationContext;Lorg/eclipse/ditto/gateway/service/streaming/actors/SupervisedStream$WithQueue;)Lorg/eclipse/ditto/gateway/service/streaming/signals/Connect;")) {
                    WebSocketRoute webSocketRoute2 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    CharSequence charSequence2 = (CharSequence) serializedLambda.getCapturedArg(1);
                    DittoHeaders dittoHeaders2 = (DittoHeaders) serializedLambda.getCapturedArg(2);
                    JsonSchemaVersion jsonSchemaVersion2 = (JsonSchemaVersion) serializedLambda.getCapturedArg(3);
                    Optional optional = (Optional) serializedLambda.getCapturedArg(4);
                    AuthorizationContext authorizationContext2 = (AuthorizationContext) serializedLambda.getCapturedArg(5);
                    return withQueue -> {
                        this.webSocketSupervisor.supervise(withQueue.getSupervisedStream(), charSequence2, dittoHeaders2);
                        return new Connect(withQueue.getSourceQueue(), charSequence2, STREAMING_TYPE_WS, jsonSchemaVersion2, (Instant) optional.map((v0) -> {
                            return v0.getExpirationTime();
                        }).orElse(null), readDeclaredAcknowledgementLabels(dittoHeaders2), authorizationContext2, this.wsKillSwitch);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/Message;)Lakka/http/javadsl/model/ws/Message;")) {
                    return message2 -> {
                        OUT_COUNTER.increment();
                        return message2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/base/model/exceptions/DittoRuntimeException;)Lorg/eclipse/ditto/base/model/exceptions/DittoRuntimeException;")) {
                    return dittoRuntimeException -> {
                        DROPPED_COUNTER.increment();
                        return dittoRuntimeException;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function4") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/GraphDSL$Builder;Lakka/stream/SourceShape;Lakka/stream/FlowShape;Lakka/stream/FlowShape;)Lakka/stream/FlowShape;")) {
                    return (builder2, sourceShape, flowShape, flowShape2) -> {
                        UniformFanInShape add = builder2.add(Merge.create(2, true));
                        builder2.from(sourceShape).toFanIn(add);
                        builder2.from(flowShape).toFanIn(add);
                        builder2.from(add.out()).toInlet(flowShape2.in());
                        return FlowShape.of(flowShape.in(), flowShape2.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lscala/util/Either;)Ljava/lang/Object;")) {
                    return either -> {
                        Either map = either.right().map(IncomingSignal::of);
                        Either.LeftProjection left = either.left();
                        Objects.requireNonNull(left);
                        return map.getOrElse(left::get);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/japi/Pair") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Lakka/japi/Pair;")) {
                    return (v0, v1) -> {
                        return Pair.create(v0, v1);
                    };
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/TextMessage") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lakka/http/javadsl/model/ws/TextMessage;")) {
                    return TextMessage::create;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger;Ljava/lang/String;)Ljava/lang/String;")) {
                    ThreadSafeDittoLogger threadSafeDittoLogger2 = (ThreadSafeDittoLogger) serializedLambda.getCapturedArg(0);
                    return str3 -> {
                        threadSafeDittoLogger2.debug("Sending outgoing WebSocket message: {}", str3);
                        return str3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function3") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Lakka/NotUsed;Lakka/NotUsed;)Lakka/NotUsed;")) {
                    return (notUsed, notUsed2, notUsed3) -> {
                        return notUsed;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/Message") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.isText();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/Message") && serializedLambda.getImplMethodSignature().equals("()Lakka/http/javadsl/model/ws/TextMessage;")) {
                    return (v0) -> {
                        return v0.asTextMessage();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/slf4j/Logger;Ljava/lang/String;)Ljava/lang/String;")) {
                    Logger logger = (Logger) serializedLambda.getCapturedArg(0);
                    return str4 -> {
                        logger.debug("Received incoming WebSocket message: {}", str4);
                        return str4;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/protocol/adapter/ProtocolAdapter;Lorg/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentFacade;Lorg/eclipse/ditto/internal/utils/akka/logging/ThreadSafeDittoLogger;Lorg/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable;)Ljava/util/concurrent/CompletionStage;")) {
                    WebSocketRoute webSocketRoute3 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    ProtocolAdapter protocolAdapter2 = (ProtocolAdapter) serializedLambda.getCapturedArg(1);
                    SignalEnrichmentFacade signalEnrichmentFacade = (SignalEnrichmentFacade) serializedLambda.getCapturedArg(2);
                    ThreadSafeDittoLogger threadSafeDittoLogger3 = (ThreadSafeDittoLogger) serializedLambda.getCapturedArg(3);
                    return sessionedJsonifiable -> {
                        Jsonifiable.WithPredicate<JsonObject, JsonField> jsonifiable = sessionedJsonifiable.getJsonifiable();
                        if (jsonifiable instanceof StreamingAck) {
                            return CompletableFuture.completedFuture(Collections.singletonList(streamingAckToString((StreamingAck) jsonifiable)));
                        }
                        Adaptable jsonifiableToAdaptable = jsonifiableToAdaptable(jsonifiable, protocolAdapter2);
                        return sessionedJsonifiable.retrieveExtraFields(signalEnrichmentFacade).thenApply(jsonObject -> {
                            if (matchesFilter(sessionedJsonifiable, jsonObject)) {
                                return Collections.singletonList(toJsonStringWithExtra(jsonifiableToAdaptable, jsonObject));
                            }
                            issuePotentialWeakAcknowledgements(sessionedJsonifiable);
                            sessionedJsonifiable.finishSpan();
                            return Collections.emptyList();
                        }).exceptionally(th -> {
                            sessionedJsonifiable.finishSpan();
                            return reportEnrichmentError(th, protocolAdapter2, jsonifiableToAdaptable, threadSafeDittoLogger3);
                        });
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
