package org.eclipse.ditto.services.gateway.endpoints.routes.websocket;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.event.EventStream;
import akka.event.Logging;
import akka.http.javadsl.model.HttpRequest;
import akka.http.javadsl.model.HttpResponse;
import akka.http.javadsl.model.ws.Message;
import akka.http.javadsl.model.ws.TextMessage;
import akka.http.javadsl.model.ws.UpgradeToWebSocket;
import akka.http.javadsl.server.Directives;
import akka.http.javadsl.server.Route;
import akka.japi.pf.PFBuilder;
import akka.pattern.Patterns;
import akka.stream.Attributes;
import akka.stream.FanOutShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.Outlet;
import akka.stream.SinkShape;
import akka.stream.UniformFanInShape;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.GraphDSL;
import akka.stream.javadsl.Merge;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.exceptions.DittoJsonException;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.exceptions.TooManyRequestsException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.base.json.Jsonifiable;
import org.eclipse.ditto.model.jwt.ImmutableJsonWebToken;
import org.eclipse.ditto.model.jwt.JsonWebToken;
import org.eclipse.ditto.model.messages.MessageHeaderDefinition;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.JsonifiableAdaptable;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.gateway.endpoints.utils.EventSniffer;
import org.eclipse.ditto.services.gateway.security.HttpHeader;
import org.eclipse.ditto.services.gateway.streaming.Connect;
import org.eclipse.ditto.services.gateway.streaming.ResponsePublished;
import org.eclipse.ditto.services.gateway.streaming.StreamControlMessage;
import org.eclipse.ditto.services.gateway.streaming.StreamingAck;
import org.eclipse.ditto.services.gateway.streaming.WebsocketConfig;
import org.eclipse.ditto.services.gateway.streaming.actors.CommandSubscriber;
import org.eclipse.ditto.services.gateway.streaming.actors.EventAndResponsePublisher;
import org.eclipse.ditto.services.gateway.streaming.actors.StreamingActor;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.controlflow.Filter;
import org.eclipse.ditto.services.utils.akka.controlflow.LimitRateByRejection;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandNotSupportedException;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.events.base.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute.class */
public final class WebSocketRoute implements WebSocketRouteBuilder {
    private static final String PROTOCOL_CMD_ACK_SUFFIX = ":ACK";
    private static final String STREAMING_TYPE_WS = "WS";
    private static final String BEARER = "Bearer";
    private final ActorRef streamingActor;
    private final EventStream eventStream;
    private EventSniffer<String> incomingMessageSniffer;
    private EventSniffer<String> outgoingMessageSniffer;
    private WebSocketAuthorizationEnforcer authorizationEnforcer;
    private WebSocketSupervisor webSocketSupervisor;
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) WebSocketRoute.class);
    private static final Duration CONFIG_ASK_TIMEOUT = Duration.ofSeconds(5);
    private static final String STREAMING_MESSAGES = "streaming_messages";
    private static final String TYPE = "type";
    private static final String WS = "ws";
    private static final String DIRECTION = "direction";
    private static final Counter IN_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "in");
    private static final Counter OUT_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "out");
    private static final Counter DROPPED_COUNTER = DittoMetrics.counter(STREAMING_MESSAGES).tag(TYPE, WS).tag(DIRECTION, "dropped");

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute$NoOpAuthorizationEnforcer.class */
    private static final class NoOpAuthorizationEnforcer implements WebSocketAuthorizationEnforcer {
        private NoOpAuthorizationEnforcer() {
        }

        @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketAuthorizationEnforcer
        public void checkAuthorization(HttpRequest httpRequest, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders) {
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute$NoOpWebSocketSupervisor.class */
    private static final class NoOpWebSocketSupervisor implements WebSocketSupervisor {
        private NoOpWebSocketSupervisor() {
        }

        @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketSupervisor
        public void supervise(ActorRef actorRef, CharSequence charSequence, DittoHeaders dittoHeaders) {
        }
    }

    private WebSocketRoute(ActorRef actorRef, EventStream eventStream) {
        this.streamingActor = (ActorRef) ConditionChecker.checkNotNull(actorRef, "streamingActor");
        this.eventStream = (EventStream) ConditionChecker.checkNotNull(eventStream, "eventStream");
        EventSniffer<String> noOp = EventSniffer.noOp();
        this.incomingMessageSniffer = noOp;
        this.outgoingMessageSniffer = noOp;
        this.authorizationEnforcer = new NoOpAuthorizationEnforcer();
        this.webSocketSupervisor = new NoOpWebSocketSupervisor();
    }

    public static WebSocketRoute getInstance(ActorRef actorRef, EventStream eventStream) {
        return new WebSocketRoute(actorRef, eventStream);
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withIncomingEventSniffer(EventSniffer<String> eventSniffer) {
        this.incomingMessageSniffer = (EventSniffer) ConditionChecker.checkNotNull(eventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withOutgoingEventSniffer(EventSniffer<String> eventSniffer) {
        this.outgoingMessageSniffer = (EventSniffer) ConditionChecker.checkNotNull(eventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withAuthorizationEnforcer(WebSocketAuthorizationEnforcer webSocketAuthorizationEnforcer) {
        this.authorizationEnforcer = (WebSocketAuthorizationEnforcer) ConditionChecker.checkNotNull(webSocketAuthorizationEnforcer, "enforcer");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder
    public WebSocketRouteBuilder withWebSocketSupervisor(WebSocketSupervisor webSocketSupervisor) {
        this.webSocketSupervisor = (WebSocketSupervisor) ConditionChecker.checkNotNull(webSocketSupervisor, "webSocketSupervisor");
        return this;
    }

    @Override // org.eclipse.ditto.services.gateway.endpoints.routes.websocket.WebSocketRouteBuilder
    public Route build(Integer num, CharSequence charSequence, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter) {
        return Directives.extractUpgradeToWebSocket(upgradeToWebSocket -> {
            return Directives.extractRequest(httpRequest -> {
                this.authorizationEnforcer.checkAuthorization(httpRequest, authorizationContext, dittoHeaders);
                return Directives.completeWithFuture(createWebsocket(upgradeToWebSocket, num, charSequence.toString(), authorizationContext, dittoHeaders, protocolAdapter, httpRequest));
            });
        });
    }

    private CompletionStage<WebsocketConfig> retrieveWebsocketConfig() {
        return Patterns.ask(this.streamingActor, StreamingActor.Control.RETRIEVE_WEBSOCKET_CONFIG, CONFIG_ASK_TIMEOUT).thenApply(obj -> {
            return (WebsocketConfig) obj;
        });
    }

    private CompletionStage<HttpResponse> createWebsocket(UpgradeToWebSocket upgradeToWebSocket, Integer num, String str, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest) {
        LogUtil.logWithCorrelationId(LOGGER, str, (Consumer<Logger>) logger -> {
            logger.info("Creating WebSocket for connection authContext: <{}>", authorizationContext);
        });
        return retrieveWebsocketConfig().thenApply(websocketConfig -> {
            return upgradeToWebSocket.handleMessagesWith(createIncoming(num, str, authorizationContext, dittoHeaders, protocolAdapter, httpRequest, websocketConfig).via(createOutgoing(str, dittoHeaders, protocolAdapter, httpRequest, websocketConfig)));
        });
    }

    private Flow<Message, DittoRuntimeException, NotUsed> createIncoming(Integer num, String str, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest, WebsocketConfig websocketConfig) {
        return Flow.fromGraph(GraphDSL.create(builder -> {
            FlowShape flowShape = (FlowShape) builder.add(getStrictifyFlow(httpRequest, str).via(throttle(websocketConfig)));
            FanOutShape2 fanOutShape2 = (FanOutShape2) builder.add(selectStreamControlOrSignal(num, str, authorizationContext, dittoHeaders, protocolAdapter));
            FanOutShape2 fanOutShape22 = (FanOutShape2) builder.add(getRateLimiter(websocketConfig));
            FlowShape flowShape2 = (FlowShape) builder.add(Flow.fromFunction(dittoRuntimeException -> {
                DROPPED_COUNTER.increment();
                return dittoRuntimeException;
            }));
            SinkShape sinkShape = (SinkShape) builder.add(getStreamControlOrSignalSink(websocketConfig));
            UniformFanInShape uniformFanInShape = (UniformFanInShape) builder.add(Merge.create(2, true));
            builder.from(flowShape.out()).toInlet(fanOutShape2.in());
            builder.from(fanOutShape2.out0()).toInlet(fanOutShape22.in());
            builder.from(fanOutShape2.out1()).toFanIn(uniformFanInShape);
            builder.from(fanOutShape22.out0()).to(sinkShape);
            builder.from(fanOutShape22.out1()).via(flowShape2).toFanIn(uniformFanInShape);
            return FlowShape.of(flowShape.in(), uniformFanInShape.out());
        }));
    }

    private Graph<SinkShape<Either<StreamControlMessage, Signal>>, NotUsed> getStreamControlOrSignalSink(WebsocketConfig websocketConfig) {
        return GraphDSL.create(builder -> {
            FanOutShape2 fanOutShape2 = (FanOutShape2) builder.add(Filter.multiplexByEither(Function.identity()));
            SinkShape sinkShape = (SinkShape) builder.add(getCommandSubscriberSink(websocketConfig));
            SinkShape sinkShape2 = (SinkShape) builder.add(getStreamingActorSink());
            builder.from(fanOutShape2.out0()).to(sinkShape);
            builder.from(fanOutShape2.out1()).to(sinkShape2);
            return SinkShape.of(fanOutShape2.in());
        });
    }

    private Sink<Signal, ActorRef> getCommandSubscriberSink(WebsocketConfig websocketConfig) {
        return Sink.actorSubscriber(CommandSubscriber.props(this.streamingActor, websocketConfig.getSubscriberBackpressureQueueSize(), this.eventStream));
    }

    private Flow<Message, String, NotUsed> getStrictifyFlow(HttpRequest httpRequest, String str) {
        return Flow.create().via(Flow.fromFunction(message -> {
            IN_COUNTER.increment();
            return message;
        })).filter((v0) -> {
            return v0.isText();
        }).map((v0) -> {
            return v0.asTextMessage();
        }).map(textMessage -> {
            return textMessage.isStrict() ? Source.single(textMessage.getStrictText()) : textMessage.getStreamedText();
        }).flatMapConcat(source -> {
            return source.fold("", (str2, str3) -> {
                return str2 + str3;
            });
        }).via(this.incomingMessageSniffer.toAsyncFlow(httpRequest)).via(Flow.fromFunction(str2 -> {
            LogUtil.logWithCorrelationId(LOGGER, str, (Consumer<Logger>) logger -> {
                logger.debug("Received incoming WebSocket message: {}", str2);
            });
            return str2;
        })).mo3225withAttributes(Attributes.createLogLevels(Logging.DebugLevel(), Logging.DebugLevel(), Logging.WarningLevel()));
    }

    private Sink<StreamControlMessage, ?> getStreamingActorSink() {
        return Sink.foreach(streamControlMessage -> {
            this.streamingActor.tell(streamControlMessage, ActorRef.noSender());
        });
    }

    private Graph<FanOutShape2<String, Either<StreamControlMessage, Signal>, DittoRuntimeException>, NotUsed> selectStreamControlOrSignal(Integer num, String str, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter) {
        ProtocolMessageExtractor protocolMessageExtractor = new ProtocolMessageExtractor(authorizationContext, str);
        return Filter.multiplexByEither(str2 -> {
            Optional<StreamControlMessage> apply = protocolMessageExtractor.apply(str2);
            if (apply.isPresent()) {
                return Right.apply(Left.apply(apply.get()));
            }
            try {
                return Right.apply(Right.apply(buildSignal(str2, num, str, authorizationContext, dittoHeaders, protocolAdapter)));
            } catch (DittoRuntimeException e) {
                LOGGER.debug("DittoRuntimeException building signal from <{}>: <{}>", str2, e);
                return Left.apply(e);
            } catch (Exception e2) {
                LOGGER.warn("Error building signal from <{}>: {}: <{}>", str2, e2.getClass().getSimpleName(), e2.getMessage());
                return Left.apply(GatewayInternalErrorException.newBuilder().cause2(e2).build());
            }
        });
    }

    private Flow<DittoRuntimeException, Message, NotUsed> createOutgoing(String str, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter, HttpRequest httpRequest, WebsocketConfig websocketConfig) {
        Optional<JsonWebToken> extractJwtFromRequestIfPresent = extractJwtFromRequestIfPresent(httpRequest);
        return joinOutgoingFlows(Source.actorPublisher(EventAndResponsePublisher.props(websocketConfig.getPublisherBackpressureBufferSize())).mapMaterializedValue(actorRef -> {
            this.webSocketSupervisor.supervise(actorRef, str, dittoHeaders);
            this.streamingActor.tell(new Connect(actorRef, str, STREAMING_TYPE_WS, (Instant) extractJwtFromRequestIfPresent.map((v0) -> {
                return v0.getExpirationTime();
            }).orElse(null)), ActorRef.noSender());
            return NotUsed.getInstance();
        }).map(this::publishResponsePublishedEvent).recoverWithRetries(1, new PFBuilder().match(GatewayWebsocketSessionExpiredException.class, obj -> {
            LogUtil.logWithCorrelationId(LOGGER, str, (Consumer<Logger>) logger -> {
                logger.info("WebSocket connection terminated because JWT expired!");
            });
            return Source.empty();
        }).match(GatewayWebsocketSessionClosedException.class, obj2 -> {
            LogUtil.logWithCorrelationId(LOGGER, str, (Consumer<Logger>) logger -> {
                logger.info("WebSocket connection terminated because authorization context changed!");
            });
            return Source.empty();
        }).build()), Flow.fromFunction(dittoRuntimeException -> {
            return dittoRuntimeException;
        }), Flow.fromFunction(jsonifiableToString(protocolAdapter)).via(Flow.fromFunction(str2 -> {
            LogUtil.logWithCorrelationId(LOGGER, str, (Consumer<Logger>) logger -> {
                logger.debug("Sending outgoing WebSocket message: {}", str2);
            });
            return str2;
        })).via(this.outgoingMessageSniffer.toAsyncFlow(httpRequest)).map(TextMessage::create).via(Flow.fromFunction(message -> {
            OUT_COUNTER.increment();
            return message;
        })));
    }

    private static <T> Flow<DittoRuntimeException, Message, NotUsed> joinOutgoingFlows(Source<T, NotUsed> source, Flow<DittoRuntimeException, T, NotUsed> flow, Flow<T, Message, NotUsed> flow2) {
        return Flow.fromGraph(GraphDSL.create3(source, flow, flow2, (notUsed, notUsed2, notUsed3) -> {
            return notUsed;
        }, (builder, sourceShape, flowShape, flowShape2) -> {
            UniformFanInShape uniformFanInShape = (UniformFanInShape) builder.add(Merge.create(2, true));
            builder.from(sourceShape).toFanIn(uniformFanInShape);
            builder.from(flowShape).toFanIn(uniformFanInShape);
            builder.from((Outlet) uniformFanInShape.out()).toInlet(flowShape2.in());
            return FlowShape.of(flowShape.in(), flowShape2.out());
        }));
    }

    private Jsonifiable.WithPredicate<JsonObject, JsonField> publishResponsePublishedEvent(Jsonifiable.WithPredicate<JsonObject, JsonField> withPredicate) {
        if ((withPredicate instanceof CommandResponse) || (withPredicate instanceof DittoRuntimeException)) {
            Optional<U> map = ((WithDittoHeaders) withPredicate).getDittoHeaders().getCorrelationId().map(ResponsePublished::new);
            EventStream eventStream = this.eventStream;
            eventStream.getClass();
            map.ifPresent((v1) -> {
                r1.publish(v1);
            });
        }
        return withPredicate;
    }

    private static <T> Graph<FanOutShape2<Either<T, Signal>, Either<T, Signal>, DittoRuntimeException>, NotUsed> getRateLimiter(WebsocketConfig websocketConfig) {
        Duration interval = websocketConfig.getThrottlingConfig().getInterval();
        int limit = websocketConfig.getThrottlingConfig().getLimit();
        return LimitRateByRejection.of(interval, Math.max(limit, (int) (limit * websocketConfig.getThrottlingRejectionFactor())), either -> {
            TooManyRequestsException.Builder retryAfter = TooManyRequestsException.newBuilder().retryAfter(interval);
            if (either.isRight()) {
                retryAfter.dittoHeaders(((Signal) either.right().get()).getDittoHeaders());
            }
            return retryAfter.build();
        });
    }

    private static <T> Flow<T, T, NotUsed> throttle(WebsocketConfig websocketConfig) {
        return Flow.create().throttle(websocketConfig.getThrottlingConfig().getLimit(), websocketConfig.getThrottlingConfig().getInterval());
    }

    private static Signal buildSignal(String str, Integer num, String str2, AuthorizationContext authorizationContext, DittoHeaders dittoHeaders, ProtocolAdapter protocolAdapter) {
        DittoHeaders build = DittoHeaders.newBuilder().schemaVersion(JsonSchemaVersion.forInt(num.intValue()).orElseThrow(() -> {
            return CommandNotSupportedException.newBuilder(num.intValue()).build();
        })).authorizationContext(authorizationContext).correlationId(str2).origin(str2).build();
        if (str.isEmpty()) {
            throw new DittoJsonException(new IllegalArgumentException("Empty json."), build);
        }
        try {
            Signal<?> fromAdaptable = protocolAdapter.fromAdaptable((JsonifiableAdaptable) DittoJsonException.wrapJsonRuntimeException(str, DittoHeaders.empty(), (str3, dittoHeaders2) -> {
                return ProtocolFactory.jsonifiableAdaptableFromJson(JsonFactory.newObject(str3));
            }));
            DittoHeadersBuilder newBuilder = DittoHeaders.newBuilder();
            LogUtil.logWithCorrelationId(LOGGER, str2, (Consumer<Logger>) logger -> {
                logger.debug("WebSocket message has been converted to signal <{}>.", fromAdaptable);
                DittoHeaders dittoHeaders3 = fromAdaptable.getDittoHeaders();
                logger.trace("Adding initialInternalHeaders: <{}>.", build);
                newBuilder.putHeaders(build);
                logger.trace("Adding additionalHeaders: <{}>.", dittoHeaders);
                newBuilder.putHeaders(dittoHeaders);
                logger.trace("Adding signalHeaders: <{}>.", dittoHeaders3);
                newBuilder.putHeaders(dittoHeaders3);
                if (!dittoHeaders3.getCorrelationId().isPresent()) {
                    String uuid = UUID.randomUUID().toString();
                    logger.trace("Adding generated correlationId: <{}>.", uuid);
                    newBuilder.correlationId(uuid);
                }
                logger.debug("Generated internalHeaders are: <{}>.", newBuilder);
            });
            return (Signal) fromAdaptable.setDittoHeaders2(newBuilder.build());
        } catch (DittoRuntimeException e) {
            throw e.setDittoHeaders2(e.getDittoHeaders().toBuilder().origin(str2).build());
        }
    }

    private static akka.japi.function.Function<Jsonifiable.WithPredicate<JsonObject, JsonField>, String> jsonifiableToString(ProtocolAdapter protocolAdapter) {
        return withPredicate -> {
            if (withPredicate instanceof StreamingAck) {
                return streamingAckToString((StreamingAck) withPredicate);
            }
            return ProtocolFactory.wrapAsJsonifiableAdaptable(((withPredicate instanceof WithDittoHeaders) && ((WithDittoHeaders) withPredicate).getDittoHeaders().getChannel().isPresent()) ? jsonifiableToAdaptable(withPredicate, TopicPath.Channel.forName(((WithDittoHeaders) withPredicate).getDittoHeaders().getChannel().get()).orElse(TopicPath.Channel.TWIN), protocolAdapter) : ((withPredicate instanceof Signal) && isLiveSignal((Signal) withPredicate)) ? jsonifiableToAdaptable(withPredicate, TopicPath.Channel.LIVE, protocolAdapter) : jsonifiableToAdaptable(withPredicate, TopicPath.Channel.TWIN, protocolAdapter)).toJsonString();
        };
    }

    private static String streamingAckToString(StreamingAck streamingAck) {
        String protocolMessages;
        StreamingType streamingType = streamingAck.getStreamingType();
        boolean isSubscribed = streamingAck.isSubscribed();
        switch (streamingType) {
            case EVENTS:
                protocolMessages = isSubscribed ? ProtocolMessages.START_SEND_EVENTS.toString() : ProtocolMessages.STOP_SEND_EVENTS.toString();
                break;
            case MESSAGES:
                protocolMessages = isSubscribed ? ProtocolMessages.START_SEND_MESSAGES.toString() : ProtocolMessages.STOP_SEND_MESSAGES.toString();
                break;
            case LIVE_COMMANDS:
                protocolMessages = isSubscribed ? ProtocolMessages.START_SEND_LIVE_COMMANDS.toString() : ProtocolMessages.STOP_SEND_LIVE_COMMANDS.toString();
                break;
            case LIVE_EVENTS:
                protocolMessages = isSubscribed ? ProtocolMessages.START_SEND_LIVE_EVENTS.toString() : ProtocolMessages.STOP_SEND_LIVE_EVENTS.toString();
                break;
            default:
                throw new IllegalArgumentException("Unknown streamingType: " + streamingType);
        }
        return protocolMessages + PROTOCOL_CMD_ACK_SUFFIX;
    }

    private static boolean isLiveSignal(Signal<?> signal) {
        return StreamingType.isLiveSignal(signal);
    }

    private static Adaptable jsonifiableToAdaptable(Jsonifiable.WithPredicate<JsonObject, JsonField> withPredicate, TopicPath.Channel channel, ProtocolAdapter protocolAdapter) {
        Adaptable adaptable;
        if (withPredicate instanceof Command) {
            adaptable = protocolAdapter.toAdaptable((Command<?>) withPredicate, channel);
        } else if (withPredicate instanceof Event) {
            adaptable = protocolAdapter.toAdaptable((Event<?>) withPredicate, channel);
        } else if (withPredicate instanceof CommandResponse) {
            adaptable = protocolAdapter.toAdaptable((CommandResponse<?>) withPredicate, channel);
        } else {
            if (!(withPredicate instanceof DittoRuntimeException)) {
                throw new IllegalArgumentException("Jsonifiable was neither Command nor CommandResponse nor Event nor DittoRuntimeException: " + withPredicate.getClass().getSimpleName());
            }
            DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) withPredicate;
            DittoHeaders build = dittoRuntimeException.getDittoHeaders().toBuilder().channel(channel.getName()).build();
            String str = build.get(MessageHeaderDefinition.THING_ID.getKey());
            adaptable = protocolAdapter.toAdaptable(str != null ? ThingErrorResponse.of(ThingId.of(str), dittoRuntimeException, build) : ThingErrorResponse.of(dittoRuntimeException, build), channel);
        }
        return adaptable;
    }

    private static Optional<JsonWebToken> extractJwtFromRequestIfPresent(HttpRequest httpRequest) {
        return httpRequest.getHeader(HttpHeader.AUTHORIZATION.toString()).map((v0) -> {
            return v0.value();
        }).filter(str -> {
            return str.startsWith(BEARER);
        }).map(ImmutableJsonWebToken::fromAuthorization);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1882457605:
                if (implMethodName.equals("lambda$getStreamControlOrSignalSink$448d1ef2$1")) {
                    z = 7;
                    break;
                }
                break;
            case -1437992483:
                if (implMethodName.equals("lambda$jsonifiableToString$9995c44$1")) {
                    z = 9;
                    break;
                }
                break;
            case -1352294148:
                if (implMethodName.equals("create")) {
                    z = 12;
                    break;
                }
                break;
            case -1180098185:
                if (implMethodName.equals("isText")) {
                    z = 17;
                    break;
                }
                break;
            case -1175485336:
                if (implMethodName.equals("lambda$createOutgoing$4c0dda84$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1064247808:
                if (implMethodName.equals("lambda$createIncoming$1374a9a8$1")) {
                    z = 6;
                    break;
                }
                break;
            case -744616522:
                if (implMethodName.equals("lambda$createOutgoing$61fb199d$1")) {
                    z = 4;
                    break;
                }
                break;
            case -708446811:
                if (implMethodName.equals("lambda$null$df6c607f$1")) {
                    z = false;
                    break;
                }
                break;
            case -666454077:
                if (implMethodName.equals("lambda$createOutgoing$c148817c$1")) {
                    z = 2;
                    break;
                }
                break;
            case -623320356:
                if (implMethodName.equals("lambda$null$c6f929ac$1")) {
                    z = true;
                    break;
                }
                break;
            case -17409526:
                if (implMethodName.equals("lambda$getStrictifyFlow$1763aadb$1")) {
                    z = 14;
                    break;
                }
                break;
            case -17409525:
                if (implMethodName.equals("lambda$getStrictifyFlow$1763aadb$2")) {
                    z = 15;
                    break;
                }
                break;
            case -17409524:
                if (implMethodName.equals("lambda$getStrictifyFlow$1763aadb$3")) {
                    z = 13;
                    break;
                }
                break;
            case 388121564:
                if (implMethodName.equals("publishResponsePublishedEvent")) {
                    z = 19;
                    break;
                }
                break;
            case 529231487:
                if (implMethodName.equals("lambda$joinOutgoingFlows$2651dd00$1")) {
                    z = 8;
                    break;
                }
                break;
            case 529313172:
                if (implMethodName.equals("lambda$joinOutgoingFlows$2651dd1f$1")) {
                    z = 11;
                    break;
                }
                break;
            case 807994452:
                if (implMethodName.equals("lambda$getStreamingActorSink$7b1f1c5e$1")) {
                    z = 16;
                    break;
                }
                break;
            case 1052180104:
                if (implMethodName.equals("asTextMessage")) {
                    z = 18;
                    break;
                }
                break;
            case 1475660076:
                if (implMethodName.equals("lambda$createOutgoing$b283aaa6$1")) {
                    z = 10;
                    break;
                }
                break;
            case 2131064265:
                if (implMethodName.equals("lambda$getStrictifyFlow$1885ea$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/model/base/exceptions/DittoRuntimeException;)Lorg/eclipse/ditto/model/base/exceptions/DittoRuntimeException;")) {
                    return dittoRuntimeException -> {
                        DROPPED_COUNTER.increment();
                        return dittoRuntimeException;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function2") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;")) {
                    return (str2, str3) -> {
                        return str2 + str3;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/model/base/exceptions/DittoRuntimeException;)Lorg/eclipse/ditto/model/base/json/Jsonifiable$WithPredicate;")) {
                    return dittoRuntimeException2 -> {
                        return dittoRuntimeException2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lorg/eclipse/ditto/model/base/headers/DittoHeaders;Ljava/util/Optional;Lakka/actor/ActorRef;)Lakka/NotUsed;")) {
                    WebSocketRoute webSocketRoute = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    String str = (String) serializedLambda.getCapturedArg(1);
                    DittoHeaders dittoHeaders = (DittoHeaders) serializedLambda.getCapturedArg(2);
                    Optional optional = (Optional) serializedLambda.getCapturedArg(3);
                    return actorRef -> {
                        this.webSocketSupervisor.supervise(actorRef, str, dittoHeaders);
                        this.streamingActor.tell(new Connect(actorRef, str, STREAMING_TYPE_WS, (Instant) optional.map((v0) -> {
                            return v0.getExpirationTime();
                        }).orElse(null)), ActorRef.noSender());
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/Message;)Lakka/http/javadsl/model/ws/Message;")) {
                    return message -> {
                        OUT_COUNTER.increment();
                        return message;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;")) {
                    String str4 = (String) serializedLambda.getCapturedArg(0);
                    return str22 -> {
                        LogUtil.logWithCorrelationId(LOGGER, str4, (Consumer<Logger>) logger -> {
                            logger.debug("Received incoming WebSocket message: {}", str22);
                        });
                        return str22;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/HttpRequest;Ljava/lang/String;Lorg/eclipse/ditto/services/gateway/streaming/WebsocketConfig;Ljava/lang/Integer;Lorg/eclipse/ditto/model/base/auth/AuthorizationContext;Lorg/eclipse/ditto/model/base/headers/DittoHeaders;Lorg/eclipse/ditto/protocoladapter/ProtocolAdapter;Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/FlowShape;")) {
                    WebSocketRoute webSocketRoute2 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    HttpRequest httpRequest = (HttpRequest) serializedLambda.getCapturedArg(1);
                    String str5 = (String) serializedLambda.getCapturedArg(2);
                    WebsocketConfig websocketConfig = (WebsocketConfig) serializedLambda.getCapturedArg(3);
                    Integer num = (Integer) serializedLambda.getCapturedArg(4);
                    AuthorizationContext authorizationContext = (AuthorizationContext) serializedLambda.getCapturedArg(5);
                    DittoHeaders dittoHeaders2 = (DittoHeaders) serializedLambda.getCapturedArg(6);
                    ProtocolAdapter protocolAdapter = (ProtocolAdapter) serializedLambda.getCapturedArg(7);
                    return builder -> {
                        FlowShape flowShape = (FlowShape) builder.add(getStrictifyFlow(httpRequest, str5).via(throttle(websocketConfig)));
                        FanOutShape2 fanOutShape2 = (FanOutShape2) builder.add(selectStreamControlOrSignal(num, str5, authorizationContext, dittoHeaders2, protocolAdapter));
                        FanOutShape2 fanOutShape22 = (FanOutShape2) builder.add(getRateLimiter(websocketConfig));
                        FlowShape flowShape2 = (FlowShape) builder.add(Flow.fromFunction(dittoRuntimeException3 -> {
                            DROPPED_COUNTER.increment();
                            return dittoRuntimeException3;
                        }));
                        SinkShape sinkShape = (SinkShape) builder.add(getStreamControlOrSignalSink(websocketConfig));
                        UniformFanInShape uniformFanInShape = (UniformFanInShape) builder.add(Merge.create(2, true));
                        builder.from(flowShape.out()).toInlet(fanOutShape2.in());
                        builder.from(fanOutShape2.out0()).toInlet(fanOutShape22.in());
                        builder.from(fanOutShape2.out1()).toFanIn(uniformFanInShape);
                        builder.from(fanOutShape22.out0()).to(sinkShape);
                        builder.from(fanOutShape22.out1()).via(flowShape2).toFanIn(uniformFanInShape);
                        return FlowShape.of(flowShape.in(), uniformFanInShape.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/gateway/streaming/WebsocketConfig;Lakka/stream/javadsl/GraphDSL$Builder;)Lakka/stream/SinkShape;")) {
                    WebSocketRoute webSocketRoute3 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    WebsocketConfig websocketConfig2 = (WebsocketConfig) serializedLambda.getCapturedArg(1);
                    return builder2 -> {
                        FanOutShape2 fanOutShape2 = (FanOutShape2) builder2.add(Filter.multiplexByEither(Function.identity()));
                        SinkShape sinkShape = (SinkShape) builder2.add(getCommandSubscriberSink(websocketConfig2));
                        SinkShape sinkShape2 = (SinkShape) builder2.add(getStreamingActorSink());
                        builder2.from(fanOutShape2.out0()).to(sinkShape);
                        builder2.from(fanOutShape2.out1()).to(sinkShape2);
                        return SinkShape.of(fanOutShape2.in());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function3") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/NotUsed;Lakka/NotUsed;Lakka/NotUsed;)Lakka/NotUsed;")) {
                    return (notUsed, notUsed2, notUsed3) -> {
                        return notUsed;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/protocoladapter/ProtocolAdapter;Lorg/eclipse/ditto/model/base/json/Jsonifiable$WithPredicate;)Ljava/lang/String;")) {
                    ProtocolAdapter protocolAdapter2 = (ProtocolAdapter) serializedLambda.getCapturedArg(0);
                    return withPredicate -> {
                        if (withPredicate instanceof StreamingAck) {
                            return streamingAckToString((StreamingAck) withPredicate);
                        }
                        return ProtocolFactory.wrapAsJsonifiableAdaptable(((withPredicate instanceof WithDittoHeaders) && ((WithDittoHeaders) withPredicate).getDittoHeaders().getChannel().isPresent()) ? jsonifiableToAdaptable(withPredicate, TopicPath.Channel.forName(((WithDittoHeaders) withPredicate).getDittoHeaders().getChannel().get()).orElse(TopicPath.Channel.TWIN), protocolAdapter2) : ((withPredicate instanceof Signal) && isLiveSignal((Signal) withPredicate)) ? jsonifiableToAdaptable(withPredicate, TopicPath.Channel.LIVE, protocolAdapter2) : jsonifiableToAdaptable(withPredicate, TopicPath.Channel.TWIN, protocolAdapter2)).toJsonString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Ljava/lang/String;")) {
                    String str6 = (String) serializedLambda.getCapturedArg(0);
                    return str23 -> {
                        LogUtil.logWithCorrelationId(LOGGER, str6, (Consumer<Logger>) logger -> {
                            logger.debug("Sending outgoing WebSocket message: {}", str23);
                        });
                        return str23;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function4") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/GraphDSL$Builder;Lakka/stream/SourceShape;Lakka/stream/FlowShape;Lakka/stream/FlowShape;)Lakka/stream/FlowShape;")) {
                    return (builder3, sourceShape, flowShape, flowShape2) -> {
                        UniformFanInShape uniformFanInShape = (UniformFanInShape) builder3.add(Merge.create(2, true));
                        builder3.from(sourceShape).toFanIn(uniformFanInShape);
                        builder3.from(flowShape).toFanIn(uniformFanInShape);
                        builder3.from((Outlet) uniformFanInShape.out()).toInlet(flowShape2.in());
                        return FlowShape.of(flowShape.in(), flowShape2.out());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/TextMessage") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lakka/http/javadsl/model/ws/TextMessage;")) {
                    return TextMessage::create;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/stream/javadsl/Source;)Lakka/stream/Graph;")) {
                    return source -> {
                        return source.fold("", (str24, str32) -> {
                            return str24 + str32;
                        });
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/Message;)Lakka/http/javadsl/model/ws/Message;")) {
                    return message2 -> {
                        IN_COUNTER.increment();
                        return message2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lakka/http/javadsl/model/ws/TextMessage;)Lakka/stream/javadsl/Source;")) {
                    return textMessage -> {
                        return textMessage.isStrict() ? Source.single(textMessage.getStrictText()) : textMessage.getStreamedText();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/services/gateway/streaming/StreamControlMessage;)V")) {
                    WebSocketRoute webSocketRoute4 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    return streamControlMessage -> {
                        this.streamingActor.tell(streamControlMessage, ActorRef.noSender());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/Message") && serializedLambda.getImplMethodSignature().equals("()Z")) {
                    return (v0) -> {
                        return v0.isText();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/ws/Message") && serializedLambda.getImplMethodSignature().equals("()Lakka/http/javadsl/model/ws/TextMessage;")) {
                    return (v0) -> {
                        return v0.asTextMessage();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/services/gateway/endpoints/routes/websocket/WebSocketRoute") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/model/base/json/Jsonifiable$WithPredicate;)Lorg/eclipse/ditto/model/base/json/Jsonifiable$WithPredicate;")) {
                    WebSocketRoute webSocketRoute5 = (WebSocketRoute) serializedLambda.getCapturedArg(0);
                    return webSocketRoute5::publishResponsePublishedEvent;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
