package org.eclipse.ditto.services.concierge.enforcement;

import akka.actor.ActorRef;
import akka.japi.Pair;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.AclEnforcer;
import org.eclipse.ditto.model.enforcers.EffectedSubjects;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.model.messages.MessageFormatInvalidException;
import org.eclipse.ditto.model.messages.MessageSendNotAllowedException;
import org.eclipse.ditto.model.policies.PoliciesResourceType;
import org.eclipse.ditto.model.policies.ResourceKey;
import org.eclipse.ditto.model.things.ThingId;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.protocoladapter.UnknownCommandException;
import org.eclipse.ditto.services.models.concierge.pubsub.LiveSignalPub;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.cache.entry.Entry;
import org.eclipse.ditto.services.utils.pubsub.DistributedPub;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.base.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.SendClaimMessage;
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/LiveSignalEnforcement.class */
public final class LiveSignalEnforcement extends AbstractEnforcement<Signal<?>> {
    private final EnforcerRetriever enforcerRetriever;
    private final LiveSignalPub liveSignalPub;

    /* loaded from: input_file:org/eclipse/ditto/services/concierge/enforcement/LiveSignalEnforcement$Provider.class */
    public static final class Provider implements EnforcementProvider<Signal<?>> {
        private final Cache<EntityIdWithResourceType, Entry<EntityIdWithResourceType>> thingIdCache;
        private final Cache<EntityIdWithResourceType, Entry<Enforcer>> policyEnforcerCache;
        private final Cache<EntityIdWithResourceType, Entry<Enforcer>> aclEnforcerCache;
        private final LiveSignalPub liveSignalPub;

        public Provider(Cache<EntityIdWithResourceType, Entry<EntityIdWithResourceType>> cache, Cache<EntityIdWithResourceType, Entry<Enforcer>> cache2, Cache<EntityIdWithResourceType, Entry<Enforcer>> cache3, LiveSignalPub liveSignalPub) {
            this.thingIdCache = (Cache) Objects.requireNonNull(cache);
            this.policyEnforcerCache = (Cache) Objects.requireNonNull(cache2);
            this.aclEnforcerCache = (Cache) Objects.requireNonNull(cache3);
            this.liveSignalPub = liveSignalPub;
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public Class<Signal<?>> getCommandClass() {
            return Signal.class;
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public boolean isApplicable(Signal<?> signal) {
            return LiveSignalEnforcement.isLiveSignal(signal);
        }

        @Override // org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider
        public AbstractEnforcement<Signal<?>> createEnforcement(Contextual<Signal<?>> contextual) {
            return new LiveSignalEnforcement(contextual, this.thingIdCache, this.policyEnforcerCache, this.aclEnforcerCache, this.liveSignalPub);
        }
    }

    private LiveSignalEnforcement(Contextual<Signal<?>> contextual, Cache<EntityIdWithResourceType, Entry<EntityIdWithResourceType>> cache, Cache<EntityIdWithResourceType, Entry<Enforcer>> cache2, Cache<EntityIdWithResourceType, Entry<Enforcer>> cache3, LiveSignalPub liveSignalPub) {
        super(contextual);
        Objects.requireNonNull(cache);
        Objects.requireNonNull(cache2);
        Objects.requireNonNull(cache3);
        this.enforcerRetriever = PolicyOrAclEnforcerRetrieverFactory.create(cache, cache2, cache3);
        this.liveSignalPub = liveSignalPub;
    }

    @Override // org.eclipse.ditto.services.concierge.enforcement.AbstractEnforcement
    public CompletionStage<Contextual<WithDittoHeaders>> enforce() {
        Signal<?> signal = signal();
        LogUtil.enhanceLogWithCorrelationIdOrRandom(signal);
        return this.enforcerRetriever.retrieve(entityId(), (entry, entry2) -> {
            try {
                return doEnforce(signal, entry2).exceptionally(this::handleExceptionally);
            } catch (RuntimeException e) {
                return CompletableFuture.completedFuture(handleExceptionally(e));
            }
        });
    }

    private CompletionStage<Contextual<WithDittoHeaders>> doEnforce(Signal<?> signal, Entry<Enforcer> entry) {
        Optional<String> correlationId = signal.getDittoHeaders().getCorrelationId();
        if (!entry.exists() || !correlationId.isPresent()) {
            log(signal).info("Command of type <{}> with ID <{}> could not be dispatched as no enforcer could be looked up! Answering with ThingNotAccessibleException.", signal.getType(), signal.getEntityId());
            throw ThingNotAccessibleException.newBuilder(ThingId.of(entityId().getId())).dittoHeaders(signal.getDittoHeaders()).build();
        }
        Enforcer valueOrThrow = entry.getValueOrThrow();
        if (signal instanceof SendClaimMessage) {
            return publishMessageCommand((SendClaimMessage) signal, valueOrThrow);
        }
        if (signal instanceof CommandResponse) {
            return enforceLiveCommandResponse(signal, correlationId.get());
        }
        Optional<StreamingType> fromSignal = StreamingType.fromSignal(signal);
        if (fromSignal.isPresent()) {
            return enforceLiveSignal(fromSignal.get(), signal, valueOrThrow);
        }
        log().error("Unsupported Signal in LiveSignalEnforcement: <{}>", signal);
        throw GatewayInternalErrorException.newBuilder().dittoHeaders(signal.getDittoHeaders()).build();
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveCommandResponse(Signal<?> signal, String str) {
        Optional<Cache<String, ActorRef>> responseReceivers = this.context.getResponseReceivers();
        if (responseReceivers.isPresent()) {
            Cache<String, ActorRef> cache = responseReceivers.get();
            return cache.get(str).thenApply(optional -> {
                if (optional.isPresent()) {
                    cache.invalidate(str);
                    log().debug("Scheduling CommandResponse <{}> to original sender <{}>", signal, (ActorRef) optional.get());
                    return withMessageToReceiver(signal, (ActorRef) optional.get());
                }
                if (log().isDebugEnabled()) {
                    log().debug("Got <{}> with unknown correlation ID: <{}>", signal.getType(), signal);
                } else {
                    log().info("Got <{}> with unknown correlation ID: <{}>", signal.getType(), str);
                }
                return withMessageToReceiver(null, null);
            });
        }
        if (log().isDebugEnabled()) {
            log().debug("Got live response when global dispatching is inactive: <{}>", signal);
        } else {
            log().info("Got live response when global dispatching is inactive: <{}> with correlation ID <{}>", signal.getType(), signal.getDittoHeaders().getCorrelationId().orElse(""));
        }
        return CompletableFuture.completedFuture(withMessageToReceiver(null, null));
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(StreamingType streamingType, Signal<?> signal, Enforcer enforcer) {
        switch (streamingType) {
            case MESSAGES:
                return enforceMessageCommand((MessageCommand) signal, enforcer);
            case LIVE_EVENTS:
                return enforceLiveEvent(signal, enforcer);
            case LIVE_COMMANDS:
                if (!(enforcer instanceof AclEnforcer ? ThingCommandEnforcement.authorizeByAcl(enforcer, (ThingCommand) signal).isPresent() : ThingCommandEnforcement.authorizeByPolicy(enforcer, (ThingCommand) signal).isPresent())) {
                    log(signal).info("Live Command was NOT authorized: <{}>", signal);
                    throw ThingCommandEnforcement.errorForThingCommand((ThingCommand) signal);
                }
                Command command = (Command) addEffectedReadSubjectsToThingSignal((Command) signal, enforcer);
                log(command).info("Live Command was authorized: <{}>", command);
                return publishLiveSignal(command, this.liveSignalPub.command());
            default:
                log(signal).warning("Ignoring unsupported command signal: <{}>", signal);
                throw UnknownCommandException.newBuilder(signal.getName()).message("The sent command is not supported as live command").dittoHeaders(signal.getDittoHeaders()).build();
        }
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveEvent(Signal<?> signal, Enforcer enforcer) {
        if (enforcer.hasUnrestrictedPermissions(PoliciesResourceType.thingResource(TopicPath.PATH_DELIMITER), signal.getDittoHeaders().getAuthorizationContext(), "WRITE", new String[0])) {
            log(signal).info("Live Event was authorized: <{}>", signal);
            return publishLiveSignal((Event) addEffectedReadSubjectsToThingSignal((Event) signal, enforcer), this.liveSignalPub.event());
        }
        log(signal).info("Live Event was NOT authorized: <{}>", signal);
        throw EventSendNotAllowedException.newBuilder(((ThingEvent) signal).getThingEntityId()).dittoHeaders(signal.getDittoHeaders()).build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isLiveSignal(Signal<?> signal) {
        return StreamingType.isLiveSignal(signal);
    }

    private CompletionStage<Contextual<WithDittoHeaders>> enforceMessageCommand(MessageCommand<?, ?> messageCommand, Enforcer enforcer) {
        return isAuthorized(messageCommand, enforcer) ? publishMessageCommand(messageCommand, enforcer) : CompletableFuture.failedStage(rejectMessageCommand(messageCommand));
    }

    private CompletionStage<Contextual<WithDittoHeaders>> publishMessageCommand(MessageCommand<?, ?> messageCommand, Enforcer enforcer) {
        EffectedSubjects subjectsWithPermission = enforcer.getSubjectsWithPermission(ResourceKey.newInstance("message", messageCommand.getResourcePath()), "READ", new String[0]);
        return publishLiveSignal(messageCommand.setDittoHeaders2(DittoHeaders.newBuilder(messageCommand.getDittoHeaders()).readGrantedSubjects(subjectsWithPermission.getGranted()).readRevokedSubjects(subjectsWithPermission.getRevoked()).build()), this.liveSignalPub.message());
    }

    private MessageSendNotAllowedException rejectMessageCommand(MessageCommand<?, ?> messageCommand) {
        MessageSendNotAllowedException build = MessageSendNotAllowedException.newBuilder(messageCommand.getThingEntityId()).dittoHeaders(messageCommand.getDittoHeaders()).build();
        log(messageCommand).info("The command <{}> was not forwarded due to insufficient rights {}: {} - AuthorizationContext: {}", messageCommand.getType(), build.getClass().getSimpleName(), build.getMessage(), messageCommand.getDittoHeaders().getAuthorizationContext());
        return build;
    }

    private <T extends Signal<?>> CompletionStage<Contextual<WithDittoHeaders>> publishLiveSignal(T t, DistributedPub<T> distributedPub) {
        log(t).debug("Publish message to pub-sub");
        return addToResponseReceiver(t).thenApply(signal -> {
            return withMessageToReceiver(signal, distributedPub.getPublisher(), obj -> {
                return distributedPub.wrapForPublication((Signal) obj);
            });
        });
    }

    private CompletionStage<Signal<?>> addToResponseReceiver(Signal<?> signal) {
        Optional<Cache<String, ActorRef>> responseReceivers = this.context.getResponseReceivers();
        return (responseReceivers.isPresent() && (signal instanceof Command) && signal.getDittoHeaders().isResponseRequired()) ? insertResponseReceiverConflictFree(responseReceivers.get(), signal, sender()) : CompletableFuture.completedStage(signal);
    }

    private boolean isAuthorized(MessageCommand<?, ?> messageCommand, Enforcer enforcer) {
        return enforcer.hasUnrestrictedPermissions(extractMessageResourceKey(messageCommand), messageCommand.getDittoHeaders().getAuthorizationContext(), "WRITE", new String[0]);
    }

    private ResourceKey extractMessageResourceKey(MessageCommand<?, ?> messageCommand) {
        try {
            return PoliciesResourceType.messageResource(messageCommand.getResourcePath());
        } catch (IllegalArgumentException e) {
            throw MessageFormatInvalidException.newBuilder(JsonFactory.nullArray()).message("Unable to determine message resource path.").description("Please verify that the thing ID, message subject and direction are set correctly.").dittoHeaders(messageCommand.getDittoHeaders()).build();
        }
    }

    private static CompletionStage<Signal<?>> insertResponseReceiverConflictFree(Cache<String, ActorRef> cache, Signal<?> signal, ActorRef actorRef) {
        return setUniqueCorrelationIdForGlobalDispatching(cache, signal).thenApply(pair -> {
            cache.put((String) pair.first(), actorRef);
            return (Signal) pair.second();
        });
    }

    private static CompletionStage<Pair<String, Signal<?>>> setUniqueCorrelationIdForGlobalDispatching(Cache<String, ?> cache, Signal<?> signal) {
        String orElseGet = signal.getDittoHeaders().getCorrelationId().orElseGet(() -> {
            return UUID.randomUUID().toString();
        });
        return cache.get(orElseGet).thenCompose(optional -> {
            return (optional.isPresent() ? findUniqueCorrelationId(cache, orElseGet, getNextSuffix()) : CompletableFuture.completedStage(orElseGet)).thenApply(str -> {
                return Pair.create(str, (Signal) signal.setDittoHeaders2(signal.getDittoHeaders().toBuilder().correlationId(str).build()));
            });
        });
    }

    private static String getNextSuffix() {
        return Long.toHexString(Double.doubleToRawLongBits(Math.random()));
    }

    private static CompletionStage<String> findUniqueCorrelationId(Cache<String, ?> cache, String str, String str2) {
        String str3 = str + "#x" + str2;
        return cache.get(str3).thenCompose(optional -> {
            return optional.isPresent() ? findUniqueCorrelationId(cache, str, getNextSuffix()) : CompletableFuture.completedStage(str3);
        });
    }
}
