package org.eclipse.ditto.gateway.service.streaming.actors;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.EnumMap;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.pekko.Done;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.AbstractActorWithTimers;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.actor.Cancellable;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.KillSwitch;
import org.apache.pekko.stream.SourceRef;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.SourceQueueWithComplete;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.FatalPubSubException;
import org.eclipse.ditto.base.model.auth.AuthorizationContext;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.NamespacedEntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoHeaderInvalidException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.translator.HeaderTranslator;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.namespaces.NamespaceReader;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.streaming.StreamingSubscriptionCommand;
import org.eclipse.ditto.base.model.signals.commands.streaming.SubscribeForPersistedEvents;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.model.signals.events.streaming.StreamingSubscriptionEvent;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementAggregatorActorStarter;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementConfig;
import org.eclipse.ditto.edge.service.acknowledgements.AcknowledgementForwarderActor;
import org.eclipse.ditto.edge.service.acknowledgements.message.MessageCommandAckRequestSetter;
import org.eclipse.ditto.edge.service.acknowledgements.message.MessageCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingCommandResponseAcknowledgementProvider;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingLiveCommandAckRequestSetter;
import org.eclipse.ditto.edge.service.acknowledgements.things.ThingModifyCommandAckRequestSetter;
import org.eclipse.ditto.edge.service.placeholders.EntityIdPlaceholder;
import org.eclipse.ditto.edge.service.streaming.StreamingSubscriptionManager;
import org.eclipse.ditto.gateway.api.GatewayInternalErrorException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionAbortedException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionClosedException;
import org.eclipse.ditto.gateway.api.GatewayWebsocketSessionExpiredException;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtAuthenticationResultProvider;
import org.eclipse.ditto.gateway.service.security.authentication.jwt.JwtValidator;
import org.eclipse.ditto.gateway.service.streaming.signals.Connect;
import org.eclipse.ditto.gateway.service.streaming.signals.IncomingSignal;
import org.eclipse.ditto.gateway.service.streaming.signals.InvalidJwt;
import org.eclipse.ditto.gateway.service.streaming.signals.Jwt;
import org.eclipse.ditto.gateway.service.streaming.signals.RefreshSession;
import org.eclipse.ditto.gateway.service.streaming.signals.StartStreaming;
import org.eclipse.ditto.gateway.service.streaming.signals.StopStreaming;
import org.eclipse.ditto.gateway.service.util.config.streaming.StreamingConfig;
import org.eclipse.ditto.internal.models.signal.correlation.MatchingValidationResult;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.pubsub.StreamingType;
import org.eclipse.ditto.internal.utils.pubsubthings.DittoProtocolSub;
import org.eclipse.ditto.internal.utils.search.SubscriptionManager;
import org.eclipse.ditto.jwt.model.ImmutableJsonWebToken;
import org.eclipse.ditto.jwt.model.JsonWebToken;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.placeholders.TimePlaceholder;
import org.eclipse.ditto.policies.model.signals.announcements.PolicyAnnouncement;
import org.eclipse.ditto.protocol.placeholders.ResourcePlaceholder;
import org.eclipse.ditto.protocol.placeholders.TopicPathPlaceholder;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.events.SubscriptionEvent;
import scala.PartialFunction;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor.class */
public final class StreamingSessionActor extends AbstractActorWithTimers {
    private static final Duration SHUTDOWN_ASK_TIMEOUT = Duration.ofMinutes(2);
    private static final Duration MAX_SESSION_TIMEOUT = Duration.ofDays(100);
    private final JsonSchemaVersion jsonSchemaVersion;
    private final String connectionCorrelationId;
    private final String type;
    private final DittoProtocolSub dittoProtocolSub;
    private final SourceQueueWithComplete<SessionedJsonifiable> eventAndResponsePublisher;
    private final ActorRef commandForwarder;
    private final StreamingConfig streamingConfig;
    private final ActorRef subscriptionManager;
    private final ActorRef streamingSubscriptionManager;
    private final Set<StreamingType> outstandingSubscriptionAcks = EnumSet.noneOf(StreamingType.class);
    private final Map<StreamingType, StreamingSession> streamingSessions = new EnumMap(StreamingType.class);
    private final JwtValidator jwtValidator;
    private final JwtAuthenticationResultProvider jwtAuthenticationResultProvider;
    private final AcknowledgementAggregatorActorStarter ackregatorStarter;
    private final Set<AcknowledgementLabel> declaredAcks;
    private final ThreadSafeDittoLoggingAdapter logger;
    private AuthorizationContext authorizationContext;
    private Cancellable cancellableShutdownTask;

    @Nullable
    private final KillSwitch killSwitch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor$ConfirmSubscription.class */
    public static final class ConfirmSubscription extends WithStreamingType {
        private ConfirmSubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor$ConfirmUnsubscription.class */
    public static final class ConfirmUnsubscription extends WithStreamingType {
        private ConfirmUnsubscription(StreamingType streamingType) {
            super(streamingType);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor$Control.class */
    public enum Control {
        TERMINATED,
        SESSION_TERMINATION,
        RESUBSCRIBE,
        SERVICE_REQUESTS_DONE
    }

    /* loaded from: input_file:org/eclipse/ditto/gateway/service/streaming/actors/StreamingSessionActor$WithStreamingType.class */
    private static abstract class WithStreamingType {
        private final StreamingType streamingType;

        private WithStreamingType(StreamingType streamingType) {
            this.streamingType = streamingType;
        }

        StreamingType getStreamingType() {
            return this.streamingType;
        }
    }

    private StreamingSessionActor(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef actorRef, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, Props props, Props props2, JwtValidator jwtValidator, JwtAuthenticationResultProvider jwtAuthenticationResultProvider) {
        this.jsonSchemaVersion = connect.getJsonSchemaVersion();
        this.connectionCorrelationId = connect.getConnectionCorrelationId();
        this.type = connect.getType();
        this.dittoProtocolSub = dittoProtocolSub;
        this.eventAndResponsePublisher = connect.getEventAndResponsePublisher();
        this.commandForwarder = actorRef;
        this.streamingConfig = streamingConfig;
        this.jwtValidator = jwtValidator;
        this.jwtAuthenticationResultProvider = jwtAuthenticationResultProvider;
        this.authorizationContext = connect.getConnectionAuthContext();
        this.killSwitch = connect.getKillSwitch().orElse(null);
        this.ackregatorStarter = AcknowledgementAggregatorActorStarter.of(getContext(), streamingConfig.getAcknowledgementConfig(), headerTranslator, (Consumer<MatchingValidationResult.Failure>) null, List.of(ThingModifyCommandAckRequestSetter.getInstance(), ThingLiveCommandAckRequestSetter.getInstance(), MessageCommandAckRequestSetter.getInstance()), List.of(ThingCommandResponseAcknowledgementProvider.getInstance(), MessageCommandResponseAcknowledgementProvider.getInstance()));
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this).withCorrelationId(this.connectionCorrelationId);
        connect.getSessionExpirationTime().ifPresent(this::startSessionTimeout);
        this.subscriptionManager = getContext().actorOf(props, SubscriptionManager.ACTOR_NAME);
        this.streamingSubscriptionManager = getContext().actorOf(props2, StreamingSubscriptionManager.ACTOR_NAME);
        this.declaredAcks = connect.getDeclaredAcknowledgementLabels();
        startSubscriptionRefreshTimer();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Props props(Connect connect, DittoProtocolSub dittoProtocolSub, ActorRef actorRef, StreamingConfig streamingConfig, HeaderTranslator headerTranslator, Props props, Props props2, JwtValidator jwtValidator, JwtAuthenticationResultProvider jwtAuthenticationResultProvider) {
        return Props.create((Class<?>) StreamingSessionActor.class, connect, dittoProtocolSub, actorRef, streamingConfig, headerTranslator, props, props2, jwtValidator, jwtAuthenticationResultProvider);
    }

    @Override // org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void preStart() {
        declareAcknowledgementLabels(this.declaredAcks);
        this.cancellableShutdownTask = CoordinatedShutdown.get(getContext().getSystem()).addCancellableTask(CoordinatedShutdown.PhaseServiceRequestsDone(), "service-requests-done-streaming-session-actor", () -> {
            return Patterns.ask(getSelf(), Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT).thenApply(obj -> {
                return Done.done();
            });
        });
        this.eventAndResponsePublisher.watchCompletion().whenComplete((done, th) -> {
            getSelf().tell(Control.TERMINATED, getSelf());
        });
    }

    @Override // org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void postStop() {
        this.logger.info("Closing <{}> streaming session.", this.type);
        this.cancellableShutdownTask.cancel();
        cancelSessionTimeout();
        this.eventAndResponsePublisher.complete();
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return createIncomingSignalBehavior().orElse(createPubSubBehavior()).orElse(createSelfTerminationBehavior()).orElse(createOutgoingSignalBehavior()).orElse(logUnknownMessage());
    }

    private AbstractActor.Receive createIncomingSignalBehavior() {
        PartialFunction build = new PFBuilder().match(IncomingSignal.class, (v0) -> {
            return v0.getSignal();
        }).build();
        PartialFunction build2 = new PFBuilder().match(Signal.class, this::startAckregatorAndForward).matchAny(obj -> {
            return obj;
        }).build();
        return addPreprocessors(List.of(build, build2), ReceiveBuilder.create().match(Acknowledgement.class, this::isWeakAckForBuiltInAckLabel, this::dropWeakAckForBuiltInAckLabelAcknowledgement).match(Acknowledgement.class, this::hasUndeclaredAckLabel, this::ackLabelNotDeclared).match(Acknowledgement.class, (v1) -> {
            forwardAcknowledgementOrLiveCommandResponse(v1);
        }).match(CommandResponse.class, (v0) -> {
            return CommandResponse.isLiveCommandResponse(v0);
        }, commandResponse -> {
            this.commandForwarder.forward(commandResponse, getContext());
        }).match(CommandResponse.class, this::forwardAcknowledgementOrLiveCommandResponse).match(ThingSearchCommand.class, this::forwardSearchCommand).match(StreamingSubscriptionCommand.class, this::forwardStreamingSubscriptionCommand).match(Signal.class, signal -> {
            this.commandForwarder.tell(signal, getReturnAddress(signal));
        }).matchEquals(Done.getInstance(), done -> {
        }).build());
    }

    private AbstractActor.Receive createOutgoingSignalBehavior() {
        PartialFunction build = new PFBuilder().match(Signal.class, this::startAckForwarder).match(DittoRuntimeException.class, dittoRuntimeException -> {
            return dittoRuntimeException;
        }).build();
        return addPreprocessors(List.of(build), ReceiveBuilder.create().match(SubscriptionEvent.class, subscriptionEvent -> {
            this.logger.debug("Got SubscriptionEvent in <{}> session, publishing: {}", this.type, subscriptionEvent);
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.subscription(subscriptionEvent));
        }).match(StreamingSubscriptionEvent.class, streamingSubscriptionEvent -> {
            this.logger.debug("Got StreamingSubscriptionEvent in <{}> session, publishing: {}", this.type, streamingSubscriptionEvent);
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.streamingSubscription(streamingSubscriptionEvent));
        }).match(CommandResponse.class, (v1) -> {
            publishResponseOrError(v1);
        }).match(DittoRuntimeException.class, (v1) -> {
            publishResponseOrError(v1);
        }).match(Signal.class, this::isSameOrigin, signal -> {
            this.logger.withCorrelationId(signal).debug("Got Signal of type <{}> in <{}> session, but this was issued by  this connection itself, not publishing", signal.getType(), this.type);
        }).match(Signal.class, signal2 -> {
            StreamingType determineStreamingType = determineStreamingType(signal2);
            StreamingSession streamingSession = this.streamingSessions.get(determineStreamingType);
            if (null == streamingSession || !isSessionAllowedToReceiveSignal(signal2, streamingSession, determineStreamingType)) {
                return;
            }
            ThreadSafeDittoLoggingAdapter withCorrelationId = this.logger.withCorrelationId(signal2);
            withCorrelationId.info("Publishing Signal of type <{}> in <{}> session", signal2.getType(), this.type);
            withCorrelationId.debug("Publishing Signal of type <{}> in <{}> session: {}", this.type, signal2.getType(), signal2);
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.signal(signal2, DittoHeaders.newBuilder().authorizationContext(this.authorizationContext).schemaVersion(this.jsonSchemaVersion).build(), streamingSession));
        }).matchEquals(Done.getInstance(), done -> {
        }).build());
    }

    private AbstractActor.Receive createPubSubBehavior() {
        return ReceiveBuilder.create().match(SubscribeForPersistedEvents.class, subscribeForPersistedEvents -> {
            this.authorizationContext = subscribeForPersistedEvents.getDittoHeaders().getAuthorizationContext();
            EntityId entityId = subscribeForPersistedEvents.getEntityId();
            this.streamingSessions.put(StreamingType.EVENTS, StreamingSession.of(entityId instanceof NamespacedEntityId ? List.of(((NamespacedEntityId) entityId).getNamespace()) : List.of(), null, null, getSelf(), this.logger));
            Patterns.ask(this.commandForwarder, subscribeForPersistedEvents, subscribeForPersistedEvents.getDittoHeaders().getTimeout().orElse(Duration.ofSeconds(5L))).thenApply(obj -> {
                return (SourceRef) obj;
            }).whenComplete((sourceRef, th) -> {
                if (null != sourceRef) {
                    sourceRef.getSource().toMat(Sink.actorRef(getSelf(), Control.TERMINATED), Keep.left()).run(getContext().getSystem());
                } else {
                    if (null == th) {
                        terminateWebsocketStream();
                        return;
                    }
                    this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(DittoRuntimeException.asDittoRuntimeException(th, th -> {
                        return GatewayInternalErrorException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).cause2(th).build();
                    })));
                    terminateWebsocketStream();
                }
            });
        }).match(StartStreaming.class, startStreaming -> {
            this.authorizationContext = startStreaming.getAuthorizationContext();
            try {
                this.streamingSessions.put(startStreaming.getStreamingType(), StreamingSession.of(startStreaming.getNamespaces(), (Criteria) startStreaming.getFilter().map(str -> {
                    return parseCriteria(str, DittoHeaders.newBuilder().correlationId(startStreaming.getCorrelationId().orElse(startStreaming.getConnectionCorrelationId())).build());
                }).orElse(null), startStreaming.getExtraFields().orElse(null), getSelf(), this.logger));
                this.logger.debug("Got 'StartStreaming' message in <{}> session, subscribing for <{}> in Cluster ...", this.type, startStreaming.getStreamingType().name());
                this.outstandingSubscriptionAcks.add(startStreaming.getStreamingType());
                ConfirmSubscription confirmSubscription = new ConfirmSubscription(startStreaming.getStreamingType());
                this.dittoProtocolSub.subscribe(this.streamingSessions.keySet(), this.authorizationContext.getAuthorizationSubjectIds(), getSelf()).whenComplete((r8, th) -> {
                    if (null == th) {
                        this.logger.debug("subscription to Ditto pubsub succeeded");
                        getSelf().tell(confirmSubscription, getSelf());
                    } else {
                        this.logger.error(th, "subscription to Ditto pubsub failed: {}", th.getMessage());
                        this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(DittoRuntimeException.asDittoRuntimeException(th, th -> {
                            return GatewayInternalErrorException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(startStreaming.getConnectionCorrelationId()).build()).cause2(th).build();
                        })));
                        terminateWebsocketStream();
                    }
                });
            } catch (DittoRuntimeException e) {
                this.logger.info("Got 'DittoRuntimeException' <{}> session during 'StartStreaming' processing: {}: <{}>", this.type, e.getClass().getSimpleName(), e.getMessage());
                this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(e));
            }
        }).match(StopStreaming.class, stopStreaming -> {
            this.logger.debug("Got 'StopStreaming' message in <{}> session, unsubscribing from <{}> in Cluster ...", this.type, stopStreaming.getStreamingType().name());
            this.streamingSessions.remove(stopStreaming.getStreamingType());
            ConfirmUnsubscription confirmUnsubscription = new ConfirmUnsubscription(stopStreaming.getStreamingType());
            Set<StreamingType> keySet = this.streamingSessions.keySet();
            switch (stopStreaming.getStreamingType()) {
                case EVENTS:
                    this.dittoProtocolSub.removeTwinSubscriber(getSelf(), this.authorizationContext.getAuthorizationSubjectIds()).thenAccept(r6 -> {
                        getSelf().tell(confirmUnsubscription, getSelf());
                    });
                    return;
                case POLICY_ANNOUNCEMENTS:
                    this.dittoProtocolSub.removePolicyAnnouncementSubscriber(getSelf(), this.authorizationContext.getAuthorizationSubjectIds()).thenAccept(r62 -> {
                        getSelf().tell(confirmUnsubscription, getSelf());
                    });
                    return;
                case LIVE_COMMANDS:
                case LIVE_EVENTS:
                case MESSAGES:
                default:
                    this.dittoProtocolSub.updateLiveSubscriptions(keySet, this.authorizationContext.getAuthorizationSubjectIds(), getSelf()).thenAccept(r63 -> {
                        getSelf().tell(confirmUnsubscription, getSelf());
                    });
                    return;
            }
        }).match(ConfirmSubscription.class, confirmSubscription -> {
            confirmSubscription(confirmSubscription.getStreamingType());
        }).match(ConfirmUnsubscription.class, confirmUnsubscription -> {
            confirmUnsubscription(confirmUnsubscription.getStreamingType());
        }).matchEquals(Control.RESUBSCRIBE, this::resubscribe).build();
    }

    private AbstractActor.Receive createSelfTerminationBehavior() {
        return ReceiveBuilder.create().match(Jwt.class, this::refreshWebSocketSession).match(RefreshSession.class, refreshSession -> {
            cancelSessionTimeout();
            checkAuthorizationContextAndStartSessionTimer(refreshSession);
        }).match(InvalidJwt.class, invalidJwt -> {
            cancelSessionTimeout();
        }).match(FatalPubSubException.class, this::pubsubFailed).match(Terminated.class, (v1) -> {
            handleTerminated(v1);
        }).matchEquals(Control.TERMINATED, (v1) -> {
            handleTerminated(v1);
        }).matchEquals(Control.SESSION_TERMINATION, this::handleSessionTermination).matchEquals(Control.SERVICE_REQUESTS_DONE, this::serviceRequestsDone).build();
    }

    private void handleTerminated(Object obj) {
        this.logger.debug("EventAndResponsePublisher was terminated: {}", obj);
        this.logger.info("<{}> connection was closed, unsubscribing from Streams in Cluster ...", this.type);
        terminateWebsocketStream();
    }

    private AbstractActor.Receive logUnknownMessage() {
        return ReceiveBuilder.create().matchAny(obj -> {
            this.logger.warning("Got unknown message in '{}' session: {} '{}'", this.type, obj.getClass().getName(), obj);
        }).build();
    }

    private static AbstractActor.Receive addPreprocessors(List<PartialFunction<Object, Object>> list, AbstractActor.Receive receive) {
        return (AbstractActor.Receive) list.stream().reduce((v0, v1) -> {
            return v0.andThen(v1);
        }).map(partialFunction -> {
            return new AbstractActor.Receive(partialFunction.andThen((PartialFunction) receive.onMessage()));
        }).orElse(receive);
    }

    private boolean isWeakAckForBuiltInAckLabel(Acknowledgement acknowledgement) {
        return acknowledgement.isWeak() && List.of((Object[]) DittoAcknowledgementLabel.values()).contains(acknowledgement.getLabel());
    }

    private void dropWeakAckForBuiltInAckLabelAcknowledgement(Acknowledgement acknowledgement) {
        this.logger.withCorrelationId(acknowledgement).info("Dropping weak ack for built-in ack label <{}>", acknowledgement.getLabel());
    }

    private boolean hasUndeclaredAckLabel(Acknowledgement acknowledgement) {
        return !this.declaredAcks.contains(acknowledgement.getLabel());
    }

    private void ackLabelNotDeclared(Acknowledgement acknowledgement) {
        publishResponseOrError(AcknowledgementLabelNotDeclaredException.of(acknowledgement.getLabel(), acknowledgement.getDittoHeaders()));
    }

    private ActorRef getReturnAddress(Signal<?> signal) {
        return Command.isCommand(signal) && signal.getDittoHeaders().isResponseRequired() ? getSelf() : ActorRef.noSender();
    }

    private boolean isSameOrigin(Signal<?> signal) {
        Stream<String> stream = signal.getDittoHeaders().getOrigin().stream();
        String str = this.connectionCorrelationId;
        Objects.requireNonNull(str);
        return stream.anyMatch((v1) -> {
            return r1.equals(v1);
        });
    }

    private void pubsubFailed(FatalPubSubException fatalPubSubException) {
        DittoRuntimeException asDittoRuntimeException = fatalPubSubException.asDittoRuntimeException();
        this.logger.withCorrelationId(asDittoRuntimeException).info("pubsubFailed cause=<{}>", asDittoRuntimeException);
        this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(asDittoRuntimeException));
        terminateWebsocketStream();
    }

    private void terminateWebsocketStream() {
        this.dittoProtocolSub.removeSubscriber(getSelf());
        getContext().stop(getSelf());
    }

    private Object startAckregatorAndForward(Signal<?> signal) {
        return this.ackregatorStarter.preprocess(signal, (signal2, bool) -> {
            Optional<EntityId> entityId = WithEntityId.getEntityId(signal2);
            if (!bool.booleanValue() || !entityId.isPresent() || !(signal2 instanceof Command)) {
                return doNothing(signal2);
            }
            Command command = (Command) signal2;
            return checkForAcksWithoutResponse(command).map((v1) -> {
                return publishResponseOrError(v1);
            }).orElseGet(() -> {
                return this.ackregatorStarter.doStart((EntityId) entityId.get(), command, null, this::publishResponseOrError, (actorRef, signal2) -> {
                    this.commandForwarder.tell(signal2, actorRef);
                    return Done.getInstance();
                });
            });
        }, (v1) -> {
            return publishResponseOrError(v1);
        });
    }

    private static <T> Object doNothing(T t) {
        return t;
    }

    private Signal<?> startAckForwarder(Signal<?> signal) {
        Optional<EntityId> entityId = WithEntityId.getEntityId(signal);
        if (!entityId.isPresent()) {
            return signal;
        }
        EntityId entityId2 = entityId.get();
        AbstractActor.ActorContext context = getContext();
        ActorRef self = getSelf();
        ActorSelection actorSelection = getContext().actorSelection(this.commandForwarder.path());
        AcknowledgementConfig acknowledgementConfig = this.streamingConfig.getAcknowledgementConfig();
        Set<AcknowledgementLabel> set = this.declaredAcks;
        Objects.requireNonNull(set);
        return AcknowledgementForwarderActor.startAcknowledgementForwarder(context, self, actorSelection, entityId2, signal, acknowledgementConfig, (v1) -> {
            return r6.contains(v1);
        });
    }

    private Object publishResponseOrError(Object obj) {
        if (obj instanceof CommandResponse) {
            CommandResponse commandResponse = (CommandResponse) obj;
            this.logger.withCorrelationId(commandResponse).debug("Got 'CommandResponse' message in <{}> session, telling EventAndResponsePublisher about it: {}", this.type, commandResponse);
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.response(commandResponse));
        } else if (obj instanceof DittoRuntimeException) {
            DittoRuntimeException dittoRuntimeException = (DittoRuntimeException) obj;
            this.logger.withCorrelationId(dittoRuntimeException).debug("Got 'DittoRuntimeException' message in <{}> session, telling EventAndResponsePublisher about it: {}", this.type, dittoRuntimeException);
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(dittoRuntimeException));
        } else {
            this.logger.error("Unexpected result from AcknowledgementAggregatorActor: <{}>", obj);
        }
        return Done.getInstance();
    }

    private void forwardAcknowledgementOrLiveCommandResponse(CommandResponse<?> commandResponse) {
        ActorRef sender = getSender();
        try {
            getContext().findChild(AcknowledgementForwarderActor.determineActorName(commandResponse.getDittoHeaders())).ifPresentOrElse(actorRef -> {
                actorRef.tell(commandResponse, sender);
            }, () -> {
                if (this.logger.isDebugEnabled()) {
                    this.logger.withCorrelationId(commandResponse).debug("No AcknowledgementForwarderActor found, forwarding to command router: <{}>", commandResponse);
                } else {
                    this.logger.withCorrelationId(commandResponse).info("No AcknowledgementForwarderActor found, forwarding to command router: <{}>", commandResponse.getType());
                }
                this.commandForwarder.tell(commandResponse, ActorRef.noSender());
            });
        } catch (DittoRuntimeException e) {
            this.eventAndResponsePublisher.offer(SessionedJsonifiable.error(e));
        }
    }

    private void forwardSearchCommand(ThingSearchCommand<?> thingSearchCommand) {
        this.subscriptionManager.tell(thingSearchCommand, getSelf());
    }

    private void forwardStreamingSubscriptionCommand(StreamingSubscriptionCommand<?> streamingSubscriptionCommand) {
        this.streamingSubscriptionManager.tell(streamingSubscriptionCommand, getSelf());
    }

    private boolean isSessionAllowedToReceiveSignal(Signal<?> signal, StreamingSession streamingSession, StreamingType streamingType) {
        if (streamingType == StreamingType.POLICY_ANNOUNCEMENTS) {
            return true;
        }
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        return this.authorizationContext.isAuthorized(dittoHeaders.getReadGrantedSubjects(), dittoHeaders.getReadRevokedSubjects()) && matchesNamespaces(signal, streamingSession);
    }

    private void startSessionTimeout(Instant instant) {
        Duration between = Duration.between(Instant.now(), instant);
        if (between.isNegative() || between.isZero()) {
            this.logger.debug("Session expired already. Closing WS.");
            getSelf().tell(Control.SESSION_TERMINATION, ActorRef.noSender());
        } else if (!between.minus(MAX_SESSION_TIMEOUT).isNegative()) {
            this.logger.warning("Session lifetime <{}> is more than the maximum <{}>. Keeping session open indefinitely.", between, MAX_SESSION_TIMEOUT);
        } else {
            this.logger.debug("Starting session timeout - session will expire in {}", between);
            getTimers().startSingleTimer(Control.SESSION_TERMINATION, Control.SESSION_TERMINATION, between);
        }
    }

    private void handleSessionTermination(Control control) {
        this.logger.info("Stopping WebSocket session for connection with ID <{}>.", this.connectionCorrelationId);
        this.eventAndResponsePublisher.fail(GatewayWebsocketSessionExpiredException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).build());
    }

    private void cancelSessionTimeout() {
        getTimers().cancel(Control.SESSION_TERMINATION);
    }

    private void checkAuthorizationContextAndStartSessionTimer(RefreshSession refreshSession) {
        if (this.authorizationContext.equals(refreshSession.getAuthorizationContext())) {
            startSessionTimeout(refreshSession.getSessionTimeout());
            return;
        }
        this.logger.debug("Authorization Context changed for WebSocket session <{}>. Terminating the session.", this.connectionCorrelationId);
        this.eventAndResponsePublisher.fail(GatewayWebsocketSessionClosedException.newBuilder().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).build());
    }

    private boolean matchesNamespaces(Signal<?> signal, StreamingSession streamingSession) {
        List<String> namespaces = streamingSession.getNamespaces();
        boolean z = namespaces.isEmpty() || namespaces.contains(namespaceFromId(signal));
        if (!z) {
            this.logger.withCorrelationId(signal).debug("Signal does not match namespaces.");
        }
        return z;
    }

    private void refreshWebSocketSession(Jwt jwt) {
        String connectionCorrelationId = jwt.getConnectionCorrelationId();
        JsonWebToken fromToken = ImmutableJsonWebToken.fromToken(jwt.toString());
        this.jwtValidator.validate(fromToken).thenAccept(binaryValidationResult -> {
            if (binaryValidationResult.isValid()) {
                this.jwtAuthenticationResultProvider.getAuthenticationResult(fromToken, DittoHeaders.empty()).thenAccept(jwtAuthenticationResult -> {
                    getSelf().tell(new RefreshSession(connectionCorrelationId, fromToken.getExpirationTime(), jwtAuthenticationResult.getAuthorizationContext()), ActorRef.noSender());
                }).exceptionally(th -> {
                    this.logger.info("Got exception when handling refreshed JWT for WebSocket session <{}>: {}", connectionCorrelationId, th.getMessage());
                    getSelf().tell(InvalidJwt.getInstance(), ActorRef.noSender());
                    return null;
                });
                return;
            }
            this.logger.debug("Received invalid JWT for WebSocket session <{}>. Terminating the session.", this.connectionCorrelationId);
            this.eventAndResponsePublisher.fail(GatewayWebsocketSessionClosedException.newBuilderForInvalidToken().dittoHeaders(DittoHeaders.newBuilder().correlationId(this.connectionCorrelationId).build()).build());
            getSelf().tell(InvalidJwt.getInstance(), ActorRef.noSender());
        });
    }

    private void declareAcknowledgementLabels(Collection<AcknowledgementLabel> collection) {
        ActorRef self = getSelf();
        if (collection.isEmpty()) {
            return;
        }
        this.logger.info("Declaring acknowledgement labels <{}>", collection);
        this.dittoProtocolSub.declareAcknowledgementLabels(collection, self, null).thenAccept(r6 -> {
            this.logger.info("Acknowledgement label declaration successful for labels: <{}>", collection);
        }).exceptionally(th -> {
            DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th -> {
                return AcknowledgementLabelNotUniqueException.newBuilder().cause2(th).build();
            });
            this.logger.info("Acknowledgement label declaration failed for labels: <{}> - cause: {} {}", collection, th.getClass().getSimpleName(), th.getMessage());
            self.tell(asDittoRuntimeException, ActorRef.noSender());
            return null;
        });
    }

    private static StreamingType determineStreamingType(Signal<?> signal) {
        StreamingType streamingType;
        if (signal instanceof Event) {
            streamingType = Signal.isChannelLive(signal) ? StreamingType.LIVE_EVENTS : StreamingType.EVENTS;
        } else {
            streamingType = MessageCommand.isMessageCommand(signal) ? StreamingType.MESSAGES : signal instanceof PolicyAnnouncement ? StreamingType.POLICY_ANNOUNCEMENTS : StreamingType.LIVE_COMMANDS;
        }
        return streamingType;
    }

    @Nullable
    private static String namespaceFromId(Signal<?> signal) {
        return (String) WithEntityId.getEntityId(signal).flatMap(NamespaceReader::fromEntityId).orElse(null);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Criteria parseCriteria(String str, DittoHeaders dittoHeaders) {
        return QueryFilterCriteriaFactory.modelBased(RqlPredicateParser.getInstance(), TopicPathPlaceholder.getInstance(), EntityIdPlaceholder.getInstance(), ResourcePlaceholder.getInstance(), TimePlaceholder.getInstance()).filterCriteria(str, dittoHeaders);
    }

    private void confirmSubscription(StreamingType streamingType) {
        if (!this.outstandingSubscriptionAcks.contains(streamingType)) {
            this.logger.debug("Subscription already acked for type <{}> in <{}> session.", streamingType, this.type);
            return;
        }
        this.outstandingSubscriptionAcks.remove(streamingType);
        this.eventAndResponsePublisher.offer(SessionedJsonifiable.ack(streamingType, true, this.connectionCorrelationId));
        this.logger.debug("Subscribed to Cluster <{}> in <{}> session.", streamingType, this.type);
    }

    private void confirmUnsubscription(StreamingType streamingType) {
        this.eventAndResponsePublisher.offer(SessionedJsonifiable.ack(streamingType, false, this.connectionCorrelationId));
        this.logger.debug("Unsubscribed from Cluster <{}> in <{}> session.", streamingType, this.type);
    }

    private void startSubscriptionRefreshTimer() {
        timers().startSingleTimer(Control.RESUBSCRIBE, Control.RESUBSCRIBE, this.streamingConfig.getSubscriptionRefreshDelay().plus(Duration.ofMillis((long) (r0.toMillis() * Math.random()))));
    }

    private void resubscribe(Control control) {
        if (!this.streamingSessions.isEmpty() && this.outstandingSubscriptionAcks.isEmpty()) {
            this.dittoProtocolSub.subscribe(this.streamingSessions.keySet(), this.authorizationContext.getAuthorizationSubjectIds(), getSelf(), null, true);
        }
        startSubscriptionRefreshTimer();
    }

    private static Optional<DittoHeaderInvalidException> checkForAcksWithoutResponse(Signal<?> signal) {
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        if (dittoHeaders.isResponseRequired() || dittoHeaders.getAcknowledgementRequests().isEmpty()) {
            return Optional.empty();
        }
        String format = String.format("For WebSocket, it is forbidden to request acknowledgements while '%s' is set to false.", DittoHeaderDefinition.RESPONSE_REQUIRED.getKey());
        String key = DittoHeaderDefinition.REQUESTED_ACKS.getKey();
        return Optional.of(DittoHeaderInvalidException.newBuilder().withInvalidHeaderKey(key).message(format).description(String.format("Please set '%s' to [] or '%s' to true.", key, DittoHeaderDefinition.RESPONSE_REQUIRED.getKey())).dittoHeaders(signal.getDittoHeaders()).build());
    }

    private void serviceRequestsDone(Control control) {
        this.logger.info("{}: abort ongoing websocket session", control);
        if (this.killSwitch != null) {
            this.killSwitch.abort(GatewayWebsocketSessionAbortedException.of(DittoHeaders.empty()));
        }
        getSender().tell(Done.getInstance(), ActorRef.noSender());
    }
}
