package org.eclipse.ditto.gateway.service.endpoints.routes.sse;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.http.javadsl.marshalling.sse.EventStreamMarshalling;
import akka.http.javadsl.model.HttpHeader;
import akka.http.javadsl.model.MediaTypes;
import akka.http.javadsl.model.StatusCodes;
import akka.http.javadsl.model.headers.Accept;
import akka.http.javadsl.model.sse.ServerSentEvent;
import akka.http.javadsl.server.PathMatchers;
import akka.http.javadsl.server.RequestContext;
import akka.http.javadsl.server.Route;
import akka.http.javadsl.server.directives.RouteDirectives;
import akka.japi.pf.PFBuilder;
import akka.pattern.Patterns;
import akka.stream.KillSwitch;
import akka.stream.KillSwitches;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.exceptions.SignalEnrichmentFailedException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.service.UriEncoding;
import org.eclipse.ditto.gateway.service.endpoints.routes.AbstractRoute;
import org.eclipse.ditto.gateway.service.endpoints.routes.things.ThingsParameter;
import org.eclipse.ditto.gateway.service.endpoints.utils.GatewaySignalEnrichmentProvider;
import org.eclipse.ditto.gateway.service.streaming.StreamingAuthorizationEnforcer;
import org.eclipse.ditto.gateway.service.streaming.actors.SessionedJsonifiable;
import org.eclipse.ditto.gateway.service.streaming.actors.SupervisedStream;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.util.config.streaming.SseConfig;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.internal.models.signalenrichment.SignalEnrichmentFacade;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.search.SearchSource;
import org.eclipse.ditto.internal.utils.search.SearchSourceBuilder;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonFieldMarker;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.Placeholder;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingFieldSelector;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import scala.PartialFunction;

@NotThreadSafe
/* loaded from: input_file:org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder.class */
public final class ThingsSseRouteBuilder extends RouteDirectives implements SseRouteBuilder {
    private static final String PATH_SEARCH = "search";
    private static final String PATH_THINGS = "things";
    private static final String STREAMING_TYPE_SSE = "SSE";
    private static final String LAST_EVENT_ID_HEADER = "Last-Event-ID";
    private static final String PARAM_FILTER = "filter";
    private static final String PARAM_OPTION = "option";
    private static final String PARAM_NAMESPACES = "namespaces";
    private static final String PARAM_EXTRA_FIELDS = "extraFields";
    private static final String PARAM_FROM_HISTORICAL_REVISION = "from-historical-revision";
    private static final String PARAM_TO_HISTORICAL_REVISION = "to-historical-revision";
    private static final String PARAM_FROM_HISTORICAL_TIMESTAMP = "from-historical-timestamp";
    private static final String PARAM_TO_HISTORICAL_TIMESTAMP = "to-historical-timestamp";
    private final ActorRef streamingActor;
    private final StreamingConfig streamingConfig;
    private final QueryFilterCriteriaFactory queryFilterCriteriaFactory;
    private final ActorRef pubSubMediator;
    private SseConnectionSupervisor sseConnectionSupervisor;
    private SseEventSniffer eventSniffer;
    private StreamingAuthorizationEnforcer sseAuthorizationEnforcer;

    @Nullable
    private GatewaySignalEnrichmentProvider signalEnrichmentProvider;

    @Nullable
    private ActorRef proxyActor;
    private static final Pattern INBOX_OUTBOX_PATTERN = Pattern.compile("(/features/[^/]+)?/(inbox|outbox)/messages(/.*)?");
    private static final Pattern INBOX_OUTBOX_WITH_SUBJECT_PATTERN = Pattern.compile("(/features/[^/]+)?/(inbox|outbox)/messages/.+");
    private static final String PARAM_FIELDS = ThingsParameter.FIELDS.toString();
    private static final JsonFieldDefinition<JsonObject> CONTEXT = JsonFactory.newJsonObjectFieldDefinition("_context", new JsonFieldMarker[0]);
    private static final PartialFunction<HttpHeader, Accept> ACCEPT_HEADER_EXTRACTOR = newAcceptHeaderExtractor();
    private static final Counter THINGS_SSE_COUNTER = getCounterFor("things");
    private static final Counter SEARCH_SSE_COUNTER = getCounterFor("search");
    private static final Duration LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);

    private ThingsSseRouteBuilder(ActorSystem actorSystem, ActorRef actorRef, StreamingConfig streamingConfig, QueryFilterCriteriaFactory queryFilterCriteriaFactory, ActorRef actorRef2) {
        this.streamingActor = actorRef;
        this.streamingConfig = streamingConfig;
        this.queryFilterCriteriaFactory = queryFilterCriteriaFactory;
        this.pubSubMediator = actorRef2;
        Config config = actorSystem.settings().config();
        Config dittoExtension = ScopedConfig.dittoExtension(config);
        this.eventSniffer = SseEventSniffer.get(actorSystem, dittoExtension);
        this.sseConnectionSupervisor = SseConnectionSupervisor.get(actorSystem, dittoExtension);
        this.sseAuthorizationEnforcer = StreamingAuthorizationEnforcer.get(actorSystem, ScopedConfig.getOrEmpty(config, "ditto.gateway.streaming.sse"));
    }

    public static ThingsSseRouteBuilder getInstance(ActorSystem actorSystem, ActorRef actorRef, StreamingConfig streamingConfig, ActorRef actorRef2) {
        ConditionChecker.checkNotNull(actorRef, "streamingActor");
        return new ThingsSseRouteBuilder(actorSystem, actorRef, streamingConfig, QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), new Placeholder[]{TopicPathPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance()}), actorRef2);
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withAuthorizationEnforcer(StreamingAuthorizationEnforcer streamingAuthorizationEnforcer) {
        this.sseAuthorizationEnforcer = (StreamingAuthorizationEnforcer) ConditionChecker.checkNotNull(streamingAuthorizationEnforcer, "enforcer");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public ThingsSseRouteBuilder withEventSniffer(SseEventSniffer sseEventSniffer) {
        this.eventSniffer = (SseEventSniffer) ConditionChecker.checkNotNull(sseEventSniffer, "eventSniffer");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withSseConnectionSupervisor(SseConnectionSupervisor sseConnectionSupervisor) {
        this.sseConnectionSupervisor = (SseConnectionSupervisor) ConditionChecker.checkNotNull(sseConnectionSupervisor, "sseConnectionSupervisor");
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withSignalEnrichmentProvider(@Nullable GatewaySignalEnrichmentProvider gatewaySignalEnrichmentProvider) {
        this.signalEnrichmentProvider = gatewaySignalEnrichmentProvider;
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public SseRouteBuilder withProxyActor(@Nullable ActorRef actorRef) {
        this.proxyActor = actorRef;
        return this;
    }

    @Override // org.eclipse.ditto.gateway.service.endpoints.routes.sse.SseRouteBuilder
    public Route build(RequestContext requestContext, Supplier<CompletionStage<DittoHeaders>> supplier) {
        return headerValuePF(ACCEPT_HEADER_EXTRACTOR, accept -> {
            return get(() -> {
                return concat(buildThingsSseRoute(requestContext, supplier), new Route[]{buildSearchSseRoute(requestContext, supplier)});
            });
        });
    }

    private Route buildThingsSseRoute(RequestContext requestContext, Supplier<CompletionStage<DittoHeaders>> supplier) {
        return rawPathPrefix(PathMatchers.slash().concat("things"), () -> {
            CompletionStage thenApply = ((CompletionStage) supplier.get()).thenApply(ThingsSseRouteBuilder::getDittoHeadersWithCorrelationId);
            return concat(pathEndOrSingleSlash(() -> {
                return buildThingsSseRouteForAllThings(requestContext, thenApply);
            }), new Route[]{rawPathPrefix(PathMatchers.slash().concat(PathMatchers.segment()), str -> {
                return parameterMap(map -> {
                    HashMap hashMap = new HashMap(map);
                    hashMap.put(ThingsParameter.IDS.toString(), str);
                    return concat(pathEndOrSingleSlash(() -> {
                        return createSseRoute(requestContext, thenApply, JsonPointer.empty(), hashMap);
                    }), new Route[]{rawPathPrefix(PathMatchers.slash().concat(PathMatchers.remaining()).map(str -> {
                        return UriEncoding.decode(str, UriEncoding.EncodingType.RFC3986);
                    }).map(str2 -> {
                        return "/" + str2;
                    }), str3 -> {
                        if (INBOX_OUTBOX_PATTERN.matcher(str3).matches()) {
                            return createMessagesSseRoute(requestContext, thenApply, str, str3);
                        }
                        hashMap.put(PARAM_FIELDS, str3);
                        return createSseRoute(requestContext, thenApply, JsonPointer.of(str3), hashMap);
                    })});
                });
            })});
        });
    }

    private Route buildSearchSseRoute(RequestContext requestContext, Supplier<CompletionStage<DittoHeaders>> supplier) {
        return rawPathPrefix(PathMatchers.slash().concat("search").slash().concat("things"), () -> {
            return pathEndOrSingleSlash(() -> {
                CompletionStage thenApply = ((CompletionStage) supplier.get()).thenApply(ThingsSseRouteBuilder::getDittoHeadersWithCorrelationId);
                return parameterMap(map -> {
                    return createSearchSseRoute(requestContext, thenApply, map);
                });
            });
        });
    }

    private static DittoHeaders getDittoHeadersWithCorrelationId(DittoHeaders dittoHeaders) {
        return dittoHeaders.getCorrelationId().isPresent() ? dittoHeaders : dittoHeaders.toBuilder().correlationId(String.valueOf(UUID.randomUUID())).build();
    }

    private Route buildThingsSseRouteForAllThings(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage) {
        return parameterMap(map -> {
            return createSseRoute(requestContext, completionStage, JsonPointer.empty(), map);
        });
    }

    private Route createSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage, JsonPointer jsonPointer, Map<String, String> map) {
        String str = map.get(PARAM_FILTER);
        List<String> namespaces = getNamespaces(map.get(PARAM_NAMESPACES));
        List<ThingId> thingIds = getThingIds(map.get(ThingsParameter.IDS.toString()));
        ThingFieldSelector fieldSelector = getFieldSelector(map.get(PARAM_FIELDS));
        ThingFieldSelector fieldSelector2 = getFieldSelector(map.get(PARAM_EXTRA_FIELDS));
        Long l = (Long) Optional.ofNullable(map.get(PARAM_FROM_HISTORICAL_REVISION)).map(Long::parseLong).orElse(null);
        Long l2 = (Long) Optional.ofNullable(map.get(PARAM_TO_HISTORICAL_REVISION)).map(Long::parseLong).orElse(null);
        Instant instant = (Instant) Optional.ofNullable(map.get(PARAM_FROM_HISTORICAL_TIMESTAMP)).map((v0) -> {
            return Instant.parse(v0);
        }).orElse(null);
        Instant instant2 = (Instant) Optional.ofNullable(map.get(PARAM_TO_HISTORICAL_TIMESTAMP)).map((v0) -> {
            return Instant.parse(v0);
        }).orElse(null);
        return completeOKWithFuture((this.signalEnrichmentProvider == null ? CompletableFuture.completedStage(null) : this.signalEnrichmentProvider.getFacade(requestContext.getRequest())).thenCompose(signalEnrichmentFacade -> {
            return completionStage.thenCompose(dittoHeaders -> {
                return this.sseAuthorizationEnforcer.checkAuthorization(requestContext, dittoHeaders).thenApply(dittoHeaders -> {
                    SubscribeForPersistedEvents build;
                    if (str != null) {
                        this.queryFilterCriteriaFactory.filterCriteria(str, dittoHeaders);
                    }
                    String str2 = (String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
                        return new IllegalStateException("Expected correlation-id in SSE DittoHeaders: " + dittoHeaders);
                    });
                    AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
                    if (null != l) {
                        FeatureToggle.checkHistoricalApiAccessFeatureEnabled("streaming.subscription.commands:subscribeForPersistedEvents", dittoHeaders);
                        build = SubscribeForPersistedEvents.of((EntityId) thingIds.get(0), jsonPointer, l.longValue(), null != l2 ? l2.longValue() : Long.MAX_VALUE, dittoHeaders);
                    } else if (null != instant) {
                        FeatureToggle.checkHistoricalApiAccessFeatureEnabled("streaming.subscription.commands:subscribeForPersistedEvents", dittoHeaders);
                        build = SubscribeForPersistedEvents.of((EntityId) thingIds.get(0), jsonPointer, instant, instant2, dittoHeaders);
                    } else {
                        build = StartStreaming.getBuilder(StreamingType.EVENTS, str2, authorizationContext).withNamespaces(namespaces).withFilter(str).withExtraFields(fieldSelector2).build();
                    }
                    SubscribeForPersistedEvents subscribeForPersistedEvents = build;
                    return SupervisedStream.sourceQueue(10).viaMat(KillSwitches.single(), Keep.both()).mapMaterializedValue(pair -> {
                        SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair.first();
                        KillSwitch killSwitch = (KillSwitch) pair.second();
                        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                        this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str2, dittoHeaders);
                        CompletionStage ask = Patterns.ask(this.streamingActor, new Connect(withQueue.getSourceQueue(), str2, STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), authorizationContext, null), LOCAL_ASK_TIMEOUT);
                        Class<ActorRef> cls = ActorRef.class;
                        Objects.requireNonNull(ActorRef.class);
                        ask.thenApply(cls::cast).thenAccept(actorRef -> {
                            actorRef.tell(subscribeForPersistedEvents, ActorRef.noSender());
                        }).exceptionally(th -> {
                            killSwitch.abort(th);
                            return null;
                        });
                        return NotUsed.getInstance();
                    }).mapAsync(this.streamingConfig.getParallelism(), sessionedJsonifiable -> {
                        return postprocess(sessionedJsonifiable, signalEnrichmentFacade, thingIds, namespaces, jsonPointer, fieldSelector);
                    }).mapConcat(collection -> {
                        return collection;
                    }).map(jsonValue -> {
                        THINGS_SSE_COUNTER.increment();
                        return ServerSentEvent.create(jsonValue.toString());
                    }).log("SSE things").viaMat(this.eventSniffer.toAsyncFlow(requestContext.getRequest()), Keep.none()).keepAlive(Duration.ofSeconds(1L), ServerSentEvent::heartbeat);
                });
            });
        }), EventStreamMarshalling.toEventStream());
    }

    private Route createMessagesSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage, String str, String str2) {
        List of = List.of(ThingId.of(str));
        return completeOKWithFuture((this.signalEnrichmentProvider == null ? CompletableFuture.completedStage(null) : this.signalEnrichmentProvider.getFacade(requestContext.getRequest())).thenCompose(signalEnrichmentFacade -> {
            return completionStage.thenCompose(dittoHeaders -> {
                return this.sseAuthorizationEnforcer.checkAuthorization(requestContext, dittoHeaders).thenApply(dittoHeaders -> {
                    return SupervisedStream.sourceQueue(10).viaMat(KillSwitches.single(), Keep.both()).mapMaterializedValue(pair -> {
                        SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair.first();
                        KillSwitch killSwitch = (KillSwitch) pair.second();
                        String str3 = (String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
                            return new IllegalStateException("Expected correlation-id in SSE DittoHeaders: " + dittoHeaders);
                        });
                        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                        this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str3, dittoHeaders);
                        AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
                        Connect connect = new Connect(withQueue.getSourceQueue(), str3, STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), authorizationContext, null);
                        StartStreaming build = StartStreaming.getBuilder(StreamingType.MESSAGES, str3, authorizationContext).withFilter(MessageFormat.format("and(eq(entity:id,''{0}''),{1})", str, INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(str2).matches() ? String.format("eq(resource:path,'%s')", str2) : String.format("like(resource:path,'%s*')", str2))).build();
                        CompletionStage ask = Patterns.ask(this.streamingActor, connect, LOCAL_ASK_TIMEOUT);
                        Class<ActorRef> cls = ActorRef.class;
                        Objects.requireNonNull(ActorRef.class);
                        ask.thenApply(cls::cast).thenAccept(actorRef -> {
                            actorRef.tell(build, ActorRef.noSender());
                        }).exceptionally(th -> {
                            killSwitch.abort(th);
                            return null;
                        });
                        return NotUsed.getInstance();
                    }).filter(sessionedJsonifiable -> {
                        return sessionedJsonifiable.getJsonifiable() instanceof MessageCommand;
                    }).mapAsync(this.streamingConfig.getParallelism(), sessionedJsonifiable2 -> {
                        return postprocessMessages(of, sessionedJsonifiable2.getJsonifiable(), signalEnrichmentFacade, sessionedJsonifiable2);
                    }).mapConcat(list -> {
                        return list;
                    }).map(message -> {
                        THINGS_SSE_COUNTER.increment();
                        Optional<Charset> determineCharsetFromContentType = determineCharsetFromContentType(message.getContentType());
                        return ServerSentEvent.create((String) message.getRawPayload().map(byteBuffer -> {
                            return new String(byteBuffer.array(), (Charset) determineCharsetFromContentType.orElse(StandardCharsets.UTF_8));
                        }).orElse(""), message.getSubject());
                    }).log("SSE things/" + str2).keepAlive(Duration.ofSeconds(1L), ServerSentEvent::heartbeat);
                });
            });
        }), EventStreamMarshalling.toEventStream());
    }

    private static Optional<Charset> determineCharsetFromContentType(Optional<String> optional) {
        return optional.filter(str -> {
            return str.contains(";");
        }).map(str2 -> {
            return str2.split(";")[1];
        }).filter(str3 -> {
            return str3.contains("=");
        }).map(str4 -> {
            return str4.split("=")[1];
        }).filter(Charset::isSupported).map(Charset::forName);
    }

    private Route createSearchSseRoute(RequestContext requestContext, CompletionStage<DittoHeaders> completionStage, Map<String, String> map) {
        return this.proxyActor == null ? complete(StatusCodes.NOT_IMPLEMENTED) : completeOKWithFuture(completionStage.thenApply(dittoHeaders -> {
            this.sseAuthorizationEnforcer.checkAuthorization(requestContext, dittoHeaders);
            SearchSourceBuilder dittoHeaders = SearchSource.newBuilder().pubSubMediator(this.pubSubMediator).commandForwarder(ActorSelection.apply(this.proxyActor, "")).filter((String) map.get(PARAM_FILTER)).options((String) map.get(PARAM_OPTION)).fields((String) map.get(PARAM_FIELDS)).namespaces((String) map.get(PARAM_NAMESPACES)).dittoHeaders(dittoHeaders);
            requestContext.getRequest().getHeader(LAST_EVENT_ID_HEADER).ifPresent(httpHeader -> {
                dittoHeaders.lastThingId(httpHeader.value());
            });
            return dittoHeaders.build().startAsPair(resumeSourceBuilder -> {
            }).via(AbstractRoute.throttleByConfig(this.streamingConfig.getSseConfig().getThrottlingConfig())).map(pair -> {
                SEARCH_SSE_COUNTER.increment();
                return ServerSentEvent.create(((JsonObject) pair.second()).toString(), Optional.empty(), Optional.of((String) pair.first()), OptionalInt.empty());
            }).recoverWithRetries(1, new PFBuilder().match(DittoRuntimeException.class, dittoRuntimeException -> {
                return Source.single(ServerSentEvent.create(dittoRuntimeException.toJsonString()));
            }).build()).log("SSE search").via(this.eventSniffer.toAsyncFlow(requestContext.getRequest()));
        }), EventStreamMarshalling.toEventStream());
    }

    private CompletionStage<Collection<JsonValue>> postprocess(SessionedJsonifiable sessionedJsonifiable, @Nullable SignalEnrichmentFacade signalEnrichmentFacade, Collection<ThingId> collection, Collection<String> collection2, JsonPointer jsonPointer, @Nullable JsonFieldSelector jsonFieldSelector) {
        Supplier supplier = () -> {
            return CompletableFuture.completedFuture(Collections.emptyList());
        };
        ThingEvent jsonifiable = sessionedJsonifiable.getJsonifiable();
        if (jsonifiable instanceof ThingEvent) {
            ThingEvent thingEvent = jsonifiable;
            if (!StreamingType.isLiveSignal(thingEvent) && namespaceMatches(thingEvent, collection2) && targetThingIdMatches(thingEvent, collection)) {
                return (CompletionStage) sessionedJsonifiable.getSession().map(streamingSession -> {
                    return sessionedJsonifiable.retrieveExtraFields(signalEnrichmentFacade).thenApply(jsonObject -> {
                        return (Collection) Optional.of(streamingSession.mergeThingWithExtra(thingEvent, jsonObject)).filter(thing -> {
                            return streamingSession.matchesFilter(thing, thingEvent);
                        }).map(thing2 -> {
                            return toNonemptyValue(thing2, thingEvent, jsonPointer, jsonFieldSelector);
                        }).orElseGet(Collections::emptyList);
                    }).exceptionally(th -> {
                        DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th -> {
                            return SignalEnrichmentFailedException.newBuilder().cause(th).build();
                        });
                        sessionedJsonifiable.getSession().map((v0) -> {
                            return v0.getLogger();
                        }).ifPresent(threadSafeDittoLoggingAdapter -> {
                            threadSafeDittoLoggingAdapter.withCorrelationId(thingEvent).warning("During extra fields retrieval in <SSE> session got exception <{}>: <{}> - emitting: <{}>", th.getClass().getSimpleName(), th.getMessage(), asDittoRuntimeException);
                        });
                        return Collections.singletonList(asDittoRuntimeException.toJson());
                    });
                }).orElseGet(supplier);
            }
        }
        return (CompletionStage) supplier.get();
    }

    private <P, M extends MessageCommand<P, ?>> CompletionStage<List<Message<P>>> postprocessMessages(List<ThingId> list, M m, @Nullable SignalEnrichmentFacade signalEnrichmentFacade, SessionedJsonifiable sessionedJsonifiable) {
        Supplier supplier = () -> {
            return CompletableFuture.completedStage(List.of());
        };
        return list.contains(m.getEntityId()) ? (CompletionStage) sessionedJsonifiable.getSession().map(streamingSession -> {
            return sessionedJsonifiable.retrieveExtraFields(signalEnrichmentFacade).thenApply(jsonObject -> {
                return (List) Optional.of(streamingSession.mergeThingWithExtra(m, jsonObject)).filter(thing -> {
                    return streamingSession.matchesFilter(thing, m);
                }).map(thing2 -> {
                    return List.of(m.getMessage());
                }).orElseGet(List::of);
            }).exceptionally(th -> {
                DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th -> {
                    return SignalEnrichmentFailedException.newBuilder().cause(th).build();
                });
                sessionedJsonifiable.getSession().map((v0) -> {
                    return v0.getLogger();
                }).ifPresent(threadSafeDittoLoggingAdapter -> {
                    threadSafeDittoLoggingAdapter.withCorrelationId(m).warning("During extra fields retrieval in <SSE> session got exception <{}>: <{}> - emitting: <{}>", th.getClass().getSimpleName(), th.getMessage(), asDittoRuntimeException);
                });
                return List.of();
            });
        }).orElseGet(supplier) : (CompletionStage) supplier.get();
    }

    private static boolean namespaceMatches(ThingEvent<?> thingEvent, Collection<String> collection) {
        return collection.isEmpty() || collection.contains(namespaceFromId(thingEvent));
    }

    private static boolean targetThingIdMatches(ThingEvent<?> thingEvent, Collection<ThingId> collection) {
        return collection.isEmpty() || collection.contains(thingEvent.getEntityId());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Collection<JsonValue> toNonemptyValue(Thing thing, ThingEvent<?> thingEvent, JsonPointer jsonPointer, @Nullable JsonFieldSelector jsonFieldSelector) {
        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) thingEvent.getDittoHeaders().getSchemaVersion().orElse(thingEvent.getImplementedSchemaVersion());
        JsonObject json = null != jsonFieldSelector ? thing.toJson(jsonSchemaVersion, jsonFieldSelector) : thing.toJson(jsonSchemaVersion);
        JsonObject jsonObject = !jsonPointer.isEmpty() ? (JsonValue) json.getValue(jsonPointer).orElse(null) : Optional.ofNullable(jsonFieldSelector).filter(jsonFieldSelector2 -> {
            return jsonFieldSelector2.getPointers().stream().map((v0) -> {
                return v0.getRoot();
            }).anyMatch(optional -> {
                return optional.equals(CONTEXT.getPointer().getRoot());
            });
        }).isPresent() ? addContext(json.toBuilder(), thingEvent).get(jsonFieldSelector) : json;
        return (json.isEmpty() || null == jsonObject) ? Collections.emptyList() : Collections.singletonList(jsonObject);
    }

    private static List<String> getNamespaces(@Nullable String str) {
        return null != str ? Arrays.asList(str.split(",")) : Collections.emptyList();
    }

    private static List<ThingId> getThingIds(@Nullable String str) {
        return null != str ? Stream.of((Object[]) str.split(",")).map((v0) -> {
            return ThingId.of(v0);
        }).toList() : Collections.emptyList();
    }

    @Nullable
    private static ThingFieldSelector getFieldSelector(@Nullable String str) {
        if (str == null) {
            return null;
        }
        return ThingFieldSelector.fromString(str);
    }

    private static String namespaceFromId(ThingEvent<?> thingEvent) {
        return thingEvent.getEntityId().getNamespace();
    }

    private static PartialFunction<HttpHeader, Accept> newAcceptHeaderExtractor() {
        return new PFBuilder().match(Accept.class, ThingsSseRouteBuilder::matchesTextEventStream, accept -> {
            return accept;
        }).build();
    }

    private static boolean matchesTextEventStream(Accept accept) {
        return StreamSupport.stream(accept.getMediaRanges().spliterator(), false).filter(mediaRange -> {
            return !"*".equals(mediaRange.mainType());
        }).anyMatch(mediaRange2 -> {
            return mediaRange2.matches(MediaTypes.TEXT_EVENT_STREAM);
        });
    }

    private static Counter getCounterFor(String str) {
        return DittoMetrics.counter("streaming_messages").tag("type", SseConfig.CONFIG_PATH).tag("direction", "out").tag("path", str);
    }

    private static JsonObject addContext(JsonObjectBuilder jsonObjectBuilder, WithDittoHeaders withDittoHeaders) {
        jsonObjectBuilder.set(CONTEXT, JsonObject.newBuilder().set("headers", dittoHeadersToJson(withDittoHeaders.getDittoHeaders())).build());
        return jsonObjectBuilder.build();
    }

    private static JsonObject dittoHeadersToJson(DittoHeaders dittoHeaders) {
        return (JsonObject) dittoHeaders.entrySet().stream().map(entry -> {
            return JsonFactory.newField(JsonKey.of((CharSequence) entry.getKey()), JsonFactory.newValue((String) entry.getValue()));
        }).collect(JsonCollectors.fieldsToObject());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1416837255:
                if (implMethodName.equals("lambda$createMessagesSseRoute$b350d52f$1")) {
                    z = 4;
                    break;
                }
                break;
            case -1416837254:
                if (implMethodName.equals("lambda$createMessagesSseRoute$b350d52f$2")) {
                    z = 6;
                    break;
                }
                break;
            case -676403458:
                if (implMethodName.equals("lambda$createMessagesSseRoute$9a6c7e2e$1")) {
                    z = false;
                    break;
                }
                break;
            case -645499383:
                if (implMethodName.equals("lambda$createSseRoute$c306fe33$1")) {
                    z = 10;
                    break;
                }
                break;
            case -645499382:
                if (implMethodName.equals("lambda$createSseRoute$c306fe33$2")) {
                    z = 9;
                    break;
                }
                break;
            case 140874691:
                if (implMethodName.equals("lambda$createSseRoute$ecff288c$1")) {
                    z = 5;
                    break;
                }
                break;
            case 200896764:
                if (implMethodName.equals("heartbeat")) {
                    z = 3;
                    break;
                }
                break;
            case 657035291:
                if (implMethodName.equals("lambda$createSearchSseRoute$a3ad01c$1")) {
                    z = 2;
                    break;
                }
                break;
            case 959321074:
                if (implMethodName.equals("lambda$createSseRoute$8ac84532$1")) {
                    z = 8;
                    break;
                }
                break;
            case 1007499942:
                if (implMethodName.equals("lambda$createMessagesSseRoute$ffb67c42$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1575578155:
                if (implMethodName.equals("lambda$createMessagesSseRoute$946a7f3a$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;Lorg/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentFacade;Lorg/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable;)Ljava/util/concurrent/CompletionStage;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    SignalEnrichmentFacade signalEnrichmentFacade = (SignalEnrichmentFacade) serializedLambda.getCapturedArg(2);
                    return sessionedJsonifiable2 -> {
                        return postprocessMessages(list, sessionedJsonifiable2.getJsonifiable(), signalEnrichmentFacade, sessionedJsonifiable2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/base/model/headers/DittoHeaders;Ljava/lang/String;Ljava/lang/String;Lakka/japi/Pair;)Lakka/NotUsed;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder2 = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    DittoHeaders dittoHeaders = (DittoHeaders) serializedLambda.getCapturedArg(1);
                    String str = (String) serializedLambda.getCapturedArg(2);
                    String str2 = (String) serializedLambda.getCapturedArg(3);
                    return pair -> {
                        SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair.first();
                        KillSwitch killSwitch = (KillSwitch) pair.second();
                        String str3 = (String) dittoHeaders.getCorrelationId().orElseThrow(() -> {
                            return new IllegalStateException("Expected correlation-id in SSE DittoHeaders: " + dittoHeaders);
                        });
                        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                        this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str3, dittoHeaders);
                        AuthorizationContext authorizationContext = dittoHeaders.getAuthorizationContext();
                        Connect connect = new Connect(withQueue.getSourceQueue(), str3, STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), authorizationContext, null);
                        StartStreaming build = StartStreaming.getBuilder(StreamingType.MESSAGES, str3, authorizationContext).withFilter(MessageFormat.format("and(eq(entity:id,''{0}''),{1})", str2, INBOX_OUTBOX_WITH_SUBJECT_PATTERN.matcher(str).matches() ? String.format("eq(resource:path,'%s')", str) : String.format("like(resource:path,'%s*')", str))).build();
                        CompletionStage ask = Patterns.ask(this.streamingActor, connect, LOCAL_ASK_TIMEOUT);
                        Class<ActorRef> cls = ActorRef.class;
                        Objects.requireNonNull(ActorRef.class);
                        ask.thenApply(cls::cast).thenAccept(actorRef -> {
                            actorRef.tell(build, ActorRef.noSender());
                        }).exceptionally(th -> {
                            killSwitch.abort(th);
                            return null;
                        });
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lakka/japi/Pair;)Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return pair2 -> {
                        SEARCH_SSE_COUNTER.increment();
                        return ServerSentEvent.create(((JsonObject) pair2.second()).toString(), Optional.empty(), Optional.of((String) pair2.first()), OptionalInt.empty());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/sse/ServerSentEvent") && serializedLambda.getImplMethodSignature().equals("()Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return ServerSentEvent::heartbeat;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Creator") && serializedLambda.getFunctionalInterfaceMethodName().equals("create") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("akka/http/javadsl/model/sse/ServerSentEvent") && serializedLambda.getImplMethodSignature().equals("()Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return ServerSentEvent::heartbeat;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Ljava/lang/Iterable;")) {
                    return list2 -> {
                        return list2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/base/model/headers/DittoHeaders;Ljava/lang/String;Lorg/eclipse/ditto/base/model/auth/AuthorizationContext;Ljava/lang/Object;Lakka/japi/Pair;)Lakka/NotUsed;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder3 = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    DittoHeaders dittoHeaders2 = (DittoHeaders) serializedLambda.getCapturedArg(1);
                    String str3 = (String) serializedLambda.getCapturedArg(2);
                    AuthorizationContext authorizationContext = (AuthorizationContext) serializedLambda.getCapturedArg(3);
                    Object capturedArg = serializedLambda.getCapturedArg(4);
                    return pair3 -> {
                        SupervisedStream.WithQueue withQueue = (SupervisedStream.WithQueue) pair3.first();
                        KillSwitch killSwitch = (KillSwitch) pair3.second();
                        JsonSchemaVersion jsonSchemaVersion = (JsonSchemaVersion) dittoHeaders2.getSchemaVersion().orElse(JsonSchemaVersion.LATEST);
                        this.sseConnectionSupervisor.supervise(withQueue.getSupervisedStream(), str3, dittoHeaders2);
                        CompletionStage ask = Patterns.ask(this.streamingActor, new Connect(withQueue.getSourceQueue(), str3, STREAMING_TYPE_SSE, jsonSchemaVersion, null, Set.of(), authorizationContext, null), LOCAL_ASK_TIMEOUT);
                        Class<ActorRef> cls = ActorRef.class;
                        Objects.requireNonNull(ActorRef.class);
                        ask.thenApply(cls::cast).thenAccept(actorRef -> {
                            actorRef.tell(capturedArg, ActorRef.noSender());
                        }).exceptionally(th -> {
                            killSwitch.abort(th);
                            return null;
                        });
                        return NotUsed.getInstance();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/messages/model/Message;)Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return message -> {
                        THINGS_SSE_COUNTER.increment();
                        Optional determineCharsetFromContentType = determineCharsetFromContentType(message.getContentType());
                        return ServerSentEvent.create((String) message.getRawPayload().map(byteBuffer -> {
                            return new String(byteBuffer.array(), (Charset) determineCharsetFromContentType.orElse(StandardCharsets.UTF_8));
                        }).orElse(""), message.getSubject());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Predicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("test") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable;)Z")) {
                    return sessionedJsonifiable -> {
                        return sessionedJsonifiable.getJsonifiable() instanceof MessageCommand;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/internal/models/signalenrichment/SignalEnrichmentFacade;Ljava/util/List;Ljava/util/List;Lorg/eclipse/ditto/json/JsonPointer;Lorg/eclipse/ditto/things/model/ThingFieldSelector;Lorg/eclipse/ditto/gateway/service/streaming/actors/SessionedJsonifiable;)Ljava/util/concurrent/CompletionStage;")) {
                    ThingsSseRouteBuilder thingsSseRouteBuilder4 = (ThingsSseRouteBuilder) serializedLambda.getCapturedArg(0);
                    SignalEnrichmentFacade signalEnrichmentFacade2 = (SignalEnrichmentFacade) serializedLambda.getCapturedArg(1);
                    List list3 = (List) serializedLambda.getCapturedArg(2);
                    List list4 = (List) serializedLambda.getCapturedArg(3);
                    JsonPointer jsonPointer = (JsonPointer) serializedLambda.getCapturedArg(4);
                    ThingFieldSelector thingFieldSelector = (ThingFieldSelector) serializedLambda.getCapturedArg(5);
                    return sessionedJsonifiable3 -> {
                        return postprocess(sessionedJsonifiable3, signalEnrichmentFacade2, list3, list4, jsonPointer, thingFieldSelector);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Lorg/eclipse/ditto/json/JsonValue;)Lakka/http/javadsl/model/sse/ServerSentEvent;")) {
                    return jsonValue -> {
                        THINGS_SSE_COUNTER.increment();
                        return ServerSentEvent.create(jsonValue.toString());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/eclipse/ditto/gateway/service/endpoints/routes/sse/ThingsSseRouteBuilder") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Collection;)Ljava/lang/Iterable;")) {
                    return collection -> {
                        return collection;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
