package org.eclipse.ditto.edge.service.dispatching;

import com.typesafe.config.Config;
import java.util.concurrent.CompletionStage;
import java.util.function.Supplier;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.id.WithEntityId;
import org.eclipse.ditto.base.model.exceptions.DittoInternalErrorException;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.CommandResponse;
import org.eclipse.ditto.base.model.signals.commands.streaming.StreamingSubscriptionCommand;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.base.service.signaltransformer.SignalTransformer;
import org.eclipse.ditto.base.service.signaltransformer.SignalTransformers;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.connectivity.api.commands.sudo.ConnectivitySudoCommand;
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveAllConnectionIds;
import org.eclipse.ditto.edge.service.dispatching.EntityTaskScheduler;
import org.eclipse.ditto.internal.utils.cacheloaders.config.DefaultAskWithRetryConfig;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.config.ScopedConfig;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommand;
import org.eclipse.ditto.messages.model.signals.commands.MessageCommandResponse;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.policies.model.signals.commands.PolicyCommand;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.things.model.ThingConstants;
import org.eclipse.ditto.things.model.signals.commands.ThingCommand;
import org.eclipse.ditto.things.model.signals.commands.ThingCommandResponse;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.ThingsSearchConstants;
import org.eclipse.ditto.thingsearch.api.commands.sudo.ThingSearchSudoCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;

/* loaded from: input_file:org/eclipse/ditto/edge/service/dispatching/EdgeCommandForwarderActor.class */
public class EdgeCommandForwarderActor extends AbstractActor {
    public static final String ACTOR_NAME = "edgeCommandForwarder";
    private final DittoDiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    private final ActorRef pubSubMediator;
    private final ShardRegions shardRegions;
    private final SignalTransformer signalTransformer;
    private final AskWithRetryCommandForwarder askWithRetryCommandForwarder;
    private final ActorRef aggregatorProxyActor;
    private final ActorRef taskScheduler;

    private EdgeCommandForwarderActor(ActorRef actorRef, ShardRegions shardRegions) {
        this.pubSubMediator = actorRef;
        this.shardRegions = shardRegions;
        ActorSystem system = getContext().getSystem();
        Config config = system.settings().config();
        DefaultScopedConfig dittoScoped = DefaultScopedConfig.dittoScoped(config);
        Config dittoExtension = ScopedConfig.dittoExtension(config);
        DefaultAskWithRetryConfig.of(dittoScoped, "ask-with-retry");
        this.signalTransformer = SignalTransformers.get(system, dittoExtension);
        this.askWithRetryCommandForwarder = AskWithRetryCommandForwarder.get(system);
        this.aggregatorProxyActor = getContext().actorOf(ThingsAggregatorProxyActor.props(actorRef), ThingsAggregatorProxyActor.ACTOR_NAME);
        this.taskScheduler = getContext().actorOf(EntityTaskScheduler.props(ACTOR_NAME), "entity-task-scheduler");
    }

    public static Props props(ActorRef actorRef, ShardRegions shardRegions) {
        return Props.create((Class<?>) EdgeCommandForwarderActor.class, actorRef, shardRegions);
    }

    public static boolean isIdempotent(Command<?> command) {
        switch (command.getCategory()) {
            case QUERY:
            case MERGE:
            case MODIFY:
            case DELETE:
                return true;
            default:
                return false;
        }
    }

    @Override // org.apache.pekko.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        ActorSystem system = context().system();
        return EdgeCommandForwarderExtension.get(system, ScopedConfig.dittoExtension(system.settings().config())).getReceiveExtension(getContext()).orElse(ReceiveBuilder.create().match(MessageCommand.class, (v1) -> {
            forwardToThings(v1);
        }).match(MessageCommandResponse.class, (v1) -> {
            forwardToThings(v1);
        }).match(ThingCommand.class, (v1) -> {
            forwardToThings(v1);
        }).match(ThingCommandResponse.class, (v0) -> {
            return CommandResponse.isLiveCommandResponse(v0);
        }, (v1) -> {
            forwardToThings(v1);
        }).match(ThingCommandResponse.class, thingCommandResponse -> {
            this.log.withCorrelationId(thingCommandResponse).warning("Ignoring ThingCommandResponse received on 'twin' channel: <{}>", thingCommandResponse);
        }).match(ThingEvent.class, (v0) -> {
            return Event.isLiveEvent(v0);
        }, (v1) -> {
            forwardToThings(v1);
        }).match(ThingEvent.class, thingEvent -> {
            this.log.withCorrelationId(thingEvent).warning("Ignoring ThingEvent received on 'twin' channel: <{}>", thingEvent);
        }).match(RetrieveThings.class, (v1) -> {
            forwardToThingsAggregatorProxy(v1);
        }).match(SudoRetrieveThings.class, (v1) -> {
            forwardToThingsAggregatorProxy(v1);
        }).match(PolicyCommand.class, (v1) -> {
            forwardToPolicies(v1);
        }).match(RetrieveAllConnectionIds.class, this::forwardToConnectivityPubSub).match(ConnectivityCommand.class, (v1) -> {
            forwardToConnectivity(v1);
        }).match(ConnectivitySudoCommand.class, (v1) -> {
            forwardToConnectivity(v1);
        }).match(ThingSearchCommand.class, (v1) -> {
            forwardToThingSearch(v1);
        }).match(ThingSearchSudoCommand.class, (v1) -> {
            forwardToThingSearch(v1);
        }).match(StreamingSubscriptionCommand.class, streamingSubscriptionCommand -> {
            return streamingSubscriptionCommand.getEntityType().equals(ThingConstants.ENTITY_TYPE);
        }, (v1) -> {
            forwardToThings(v1);
        }).match(StreamingSubscriptionCommand.class, streamingSubscriptionCommand2 -> {
            return streamingSubscriptionCommand2.getEntityType().equals(PolicyConstants.ENTITY_TYPE);
        }, (v1) -> {
            forwardToPolicies(v1);
        }).match(StreamingSubscriptionCommand.class, streamingSubscriptionCommand3 -> {
            return streamingSubscriptionCommand3.getEntityType().equals(ConnectivityConstants.ENTITY_TYPE);
        }, (v1) -> {
            forwardToConnectivity(v1);
        }).match(Signal.class, this::handleUnknownSignal).matchAny(obj -> {
            this.log.warning("Got unknown message: {}", obj);
        }).build());
    }

    private void forwardToThings(Signal<?> signal) {
        ActorRef sender = getSender();
        CompletionStage<Signal<?>> applySignalTransformation = applySignalTransformation(signal, sender);
        scheduleTask(signal, () -> {
            return applySignalTransformation.thenAccept(signal2 -> {
                this.log.withCorrelationId(signal2).info("Forwarding thing signal with ID <{}> and type <{}> to 'things' shard region", signal2 instanceof WithEntityId ? ((WithEntityId) signal2).getEntityId() : null, signal2.getType());
                if (!Signal.isChannelLive(signal2) && !Signal.isChannelSmart(signal2) && (signal2 instanceof Command)) {
                    Command<?> command = (Command) signal2;
                    if (isIdempotent(command)) {
                        this.askWithRetryCommandForwarder.forwardCommand(command, this.shardRegions.things(), sender);
                        return;
                    }
                }
                this.shardRegions.things().tell(signal2, sender);
            });
        });
    }

    private void scheduleTask(Signal<?> signal, Supplier<CompletionStage<Void>> supplier) {
        if (!(signal instanceof WithEntityId)) {
            this.log.withCorrelationId(signal).debug("Scheduling signal transformation task without entity");
            supplier.get().whenComplete((r6, th) -> {
                this.log.withCorrelationId(signal).debug("Scheduled task without entityId completed successfully: <{}>", Boolean.valueOf(th == null));
            });
        } else {
            EntityId entityId = ((WithEntityId) signal).getEntityId();
            this.log.withCorrelationId(signal).debug("Scheduling signal transformation task for entityId <{}>", entityId);
            scheduleTaskForEntity(new EntityTaskScheduler.Task<>(entityId, supplier));
        }
    }

    private CompletionStage<Signal<?>> applySignalTransformation(Signal<?> signal, ActorRef actorRef) {
        return this.signalTransformer.apply(signal).whenComplete((signal2, th) -> {
            if (th != null) {
                actorRef.tell(DittoRuntimeException.asDittoRuntimeException(th, th -> {
                    return DittoInternalErrorException.newBuilder().dittoHeaders(signal.getDittoHeaders()).cause2(th).build();
                }), ActorRef.noSender());
            }
        });
    }

    private void scheduleTaskForEntity(EntityTaskScheduler.Task<Void> task) {
        this.taskScheduler.tell(task, ActorRef.noSender());
    }

    private void forwardToThingsAggregatorProxy(Command<?> command) {
        ActorRef sender = getSender();
        CompletionStage<Signal<?>> applySignalTransformation = applySignalTransformation(command, sender);
        scheduleTask(command, () -> {
            return applySignalTransformation.thenAccept(signal -> {
                this.aggregatorProxyActor.tell(signal, sender);
            });
        });
    }

    private void forwardToPolicies(Signal<?> signal) {
        ActorRef sender = getSender();
        CompletionStage<Signal<?>> applySignalTransformation = applySignalTransformation(signal, sender);
        scheduleTask(signal, () -> {
            return applySignalTransformation.thenAccept(signal2 -> {
                this.log.withCorrelationId(signal2).info("Forwarding policy command with ID <{}> and type <{}> to 'policies' shard region", signal2 instanceof WithEntityId ? ((WithEntityId) signal2).getEntityId() : null, signal2.getType());
                if (signal2 instanceof Command) {
                    Command<?> command = (Command) signal2;
                    if (isIdempotent(command)) {
                        this.askWithRetryCommandForwarder.forwardCommand(command, this.shardRegions.policies(), sender);
                        return;
                    }
                }
                this.shardRegions.policies().tell(signal2, sender);
            });
        });
    }

    public void forwardToConnectivityPubSub(RetrieveAllConnectionIds retrieveAllConnectionIds) {
        this.pubSubMediator.tell(DistPubSubAccess.send(ConnectivityMessagingConstants.CONNECTION_ID_RETRIEVAL_ACTOR_PATH, retrieveAllConnectionIds), getSender());
    }

    private void forwardToConnectivity(Command<?> command) {
        if (!(command instanceof WithEntityId)) {
            this.log.withCorrelationId(command).error("Could not forward ConnectivityCommand not implementing WithEntityId to 'connections' shard region");
            return;
        }
        WithEntityId withEntityId = (WithEntityId) command;
        ActorRef sender = getSender();
        CompletionStage<Signal<?>> applySignalTransformation = applySignalTransformation(command, sender);
        scheduleTask(command, () -> {
            return applySignalTransformation.thenAccept(signal -> {
                Command command2 = (Command) signal;
                this.log.withCorrelationId(command2).info("Forwarding connectivity command with ID <{}> and type <{}> to 'connections' shard region", withEntityId.getEntityId(), command2.getType());
                this.shardRegions.connections().tell(command2, sender);
            });
        });
    }

    private void forwardToThingSearch(Command<?> command) {
        this.pubSubMediator.tell(DistPubSubAccess.send(ThingsSearchConstants.SEARCH_ACTOR_PATH, command), getSender());
    }

    private void handleUnknownSignal(Signal<?> signal) {
        applySignalTransformation(signal, sender()).thenAccept(signal2 -> {
            this.log.withCorrelationId(signal2).error("Received signal <{}> which is not known how to be handled: {}", signal2.getType(), signal2);
        });
    }
}
