package org.eclipse.ditto.policies.enforcement;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.eclipse.ditto.base.api.commands.sudo.SudoCommand;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithStashWithTimers;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.tracing.DittoTracing;
import org.eclipse.ditto.internal.utils.tracing.span.SpanOperationName;
import org.eclipse.ditto.internal.utils.tracing.span.StartedSpan;
import org.eclipse.ditto.policies.enforcement.EnforcementReloaded;
import org.eclipse.ditto.policies.enforcement.pre.PreEnforcerProvider;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.policies.model.signals.commands.exceptions.PolicyNotAccessibleException;

/* loaded from: input_file:org/eclipse/ditto/policies/enforcement/AbstractEnforcerActor.class */
public abstract class AbstractEnforcerActor<I extends EntityId, S extends Signal<?>, R extends CommandResponse<?>, E extends EnforcementReloaded<S, R>> extends AbstractActorWithStashWithTimers {
    protected static final Duration DEFAULT_LOCAL_ASK_TIMEOUT = Duration.ofSeconds(5);
    protected final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    protected final I entityId;
    protected final E enforcement;
    protected final PreEnforcerProvider preEnforcer;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractEnforcerActor(I i, E e) {
        this.entityId = i;
        this.enforcement = e;
        ActorSystem system = getContext().getSystem();
        this.preEnforcer = PreEnforcerProvider.get(system, ScopedConfig.dittoExtension(system.settings().config()));
    }

    protected abstract CompletionStage<PolicyId> providePolicyIdForEnforcement(Signal<?> signal);

    protected abstract CompletionStage<Optional<PolicyEnforcer>> providePolicyEnforcer(@Nullable PolicyId policyId);

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.debug("Got subscribeAck <{}>.", subscribeAck);
        }).match(SudoCommand.class, sudoCommand -> {
            this.log.withCorrelationId(sudoCommand).error("Received SudoCommand in enforcer which should never happen: <{}>", sudoCommand);
        }).match(CommandResponse.class, commandResponse -> {
            replyWithFilteredCommandResponse(commandResponse);
        }).match(Signal.class, signal -> {
            enforceSignal(signal);
        }).matchAny(obj -> {
            this.log.withCorrelationId(obj instanceof WithDittoHeaders ? (WithDittoHeaders) obj : null).warning("Got unknown message: '{}'", obj);
        }).build();
    }

    protected CompletionStage<Optional<PolicyEnforcer>> loadPolicyEnforcer(Signal<?> signal) {
        return providePolicyIdForEnforcement(signal).thenCompose(this::providePolicyEnforcer);
    }

    private void enforceSignal(S s) {
        doEnforceSignal(s, getSender());
    }

    private void doEnforceSignal(S s, ActorRef actorRef) {
        StartedSpan start = DittoTracing.newPreparedSpan(s.getDittoHeaders(), SpanOperationName.of("enforce")).start();
        Signal<?> signal = (Signal) s.setDittoHeaders(DittoHeaders.of(start.propagateContext(s.getDittoHeaders())));
        ActorRef self = getSelf();
        try {
            this.preEnforcer.apply(signal).thenApply(signal2 -> {
                return signal2;
            }).thenCompose(signal3 -> {
                start.mark("pre_enforced");
                return loadPolicyEnforcer(signal3).thenCompose(optional -> {
                    start.mark("enforcer_loaded");
                    return (CompletionStage) optional.map(policyEnforcer -> {
                        return this.enforcement.authorizeSignal(signal3, policyEnforcer);
                    }).orElseGet(() -> {
                        return this.enforcement.authorizeSignalWithMissingEnforcer(signal3);
                    });
                });
            }).whenComplete((signal4, th) -> {
                if (null != signal4) {
                    start.mark("enforce_success").finish();
                    this.log.withCorrelationId(signal4).info("Completed enforcement of message type <{}> with outcome 'success'", signal4.getType());
                    actorRef.tell(signal4, self);
                } else if (null != th) {
                    start.mark("enforce_failed").tagAsFailed(th).finish();
                    handleAuthorizationFailure(signal, th, actorRef);
                } else {
                    start.mark("enforce_error").tagAsFailed("unknown-outcome").finish();
                    this.log.withCorrelationId(signal).warning("Neither authorizedSignal nor throwable were present during enforcement of signal: <{}>", signal);
                }
            });
        } catch (DittoRuntimeException e) {
            start.mark("enforce_failed").tagAsFailed(e).finish();
            handleAuthorizationFailure(signal, e, actorRef);
        }
    }

    private void handleAuthorizationFailure(Signal<?> signal, Throwable th, ActorRef actorRef) {
        DittoHeaders dittoHeaders = signal.getDittoHeaders();
        DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th2 -> {
            return DittoInternalErrorException.newBuilder().cause2(th2).dittoHeaders(dittoHeaders).build();
        });
        this.log.withCorrelationId(asDittoRuntimeException).info("Completed enforcement of message type <{}> with outcome 'failed' and headers: <{}>", signal.getType(), dittoHeaders);
        actorRef.tell(asDittoRuntimeException, getSelf());
    }

    private void replyWithFilteredCommandResponse(R r) {
        ActorRef sender = getSender();
        ActorRef parent = getContext().parent();
        if (this.enforcement.shouldFilterCommandResponse(r)) {
            Patterns.pipe(filterResponse(r), getContext().dispatcher()).to(sender, parent);
        } else {
            sender.tell(r, parent);
        }
    }

    private CompletionStage<R> filterResponse(R r) {
        return this.enforcement.shouldFilterCommandResponse(r) ? providePolicyIdForEnforcement(r).thenCompose(policyId -> {
            return providePolicyEnforcer(policyId).thenApply(optional -> {
                return Pair.apply(policyId, optional);
            });
        }).thenApply(pair -> {
            return (PolicyEnforcer) ((Optional) pair.second()).orElseThrow(() -> {
                this.log.withCorrelationId(r).debug("Could not filter command response because policyEnforcer was missing. Likely the policy was deleted during command processing.");
                throw PolicyNotAccessibleException.newBuilder((PolicyId) pair.first()).build();
            });
        }).thenCompose(policyEnforcer -> {
            return doFilterResponse(r, policyEnforcer);
        }) : CompletableFuture.completedFuture(r);
    }

    private CompletionStage<R> doFilterResponse(R r, PolicyEnforcer policyEnforcer) {
        try {
            return (CompletionStage<R>) this.enforcement.filterResponse(r, policyEnforcer).handle((commandResponse, th) -> {
                if (null != commandResponse) {
                    this.log.withCorrelationId(commandResponse).info("Completed filtering of command response type <{}>", commandResponse.getType());
                    return commandResponse;
                }
                if (null == th) {
                    this.log.withCorrelationId(r).error("Neither filteredResponse nor throwable were present during filtering of commandResponse: <{}>", r);
                    throw DittoInternalErrorException.newBuilder().dittoHeaders(r.getDittoHeaders()).build();
                }
                DittoRuntimeException asDittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(th, th -> {
                    return DittoInternalErrorException.newBuilder().cause2(th).dittoHeaders(r.getDittoHeaders()).build();
                });
                this.log.withCorrelationId(asDittoRuntimeException).info("Exception during filtering of command response type <{}> and headers: <{}>", r.getType(), r.getDittoHeaders());
                throw asDittoRuntimeException;
            });
        } catch (DittoRuntimeException e) {
            this.log.withCorrelationId(e).info("Exception during filtering of command response type <{}> and headers: <{}>", r.getType(), r.getDittoHeaders());
            throw e;
        }
    }
}
