package org.eclipse.ditto.internal.utils.persistence.operations;

import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.apache.pekko.Done;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.CoordinatedShutdown;
import org.apache.pekko.cluster.pubsub.DistributedPubSubMediator;
import org.apache.pekko.japi.pf.ReceiveBuilder;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.stream.KillSwitches;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.javadsl.Sink;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.ShutdownReasonFactory;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespaceResponse;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.pekko.logging.ThreadSafeDittoLoggingAdapter;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor.class */
public abstract class AbstractPersistenceOperationsActor extends AbstractActorWithShutdownBehavior {
    protected final ThreadSafeDittoLoggingAdapter logger;
    private static final Throwable KILL_SWITCH_EXCEPTION = new IllegalStateException("Aborting persistence operations stream because of graceful shutdown.");
    private final ActorRef pubSubMediator;
    private final EntityType entityType;

    @Nullable
    private final NamespacePersistenceOperations namespaceOps;

    @Nullable
    private final EntityPersistenceOperations entitiesOps;
    private final Materializer materializer;
    private final Collection<Closeable> toCloseWhenStopped;
    private final Duration delayAfterPersistenceActorShutdown;
    private final Map<Command<?>, ActorRef> lastCommandsAndSender;
    private final SharedKillSwitch killSwitch;

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete.class */
    public static final class OpComplete extends Record {
        private final Command<?> command;
        private final ActorRef sender;

        public OpComplete(Command<?> command, ActorRef actorRef) {
            this.command = command;
            this.sender = actorRef;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, OpComplete.class), OpComplete.class, "command;sender", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->command:Lorg/eclipse/ditto/base/model/signals/commands/Command;", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->sender:Lorg/apache/pekko/actor/ActorRef;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, OpComplete.class), OpComplete.class, "command;sender", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->command:Lorg/eclipse/ditto/base/model/signals/commands/Command;", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->sender:Lorg/apache/pekko/actor/ActorRef;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, OpComplete.class, Object.class), OpComplete.class, "command;sender", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->command:Lorg/eclipse/ditto/base/model/signals/commands/Command;", "FIELD:Lorg/eclipse/ditto/internal/utils/persistence/operations/AbstractPersistenceOperationsActor$OpComplete;->sender:Lorg/apache/pekko/actor/ActorRef;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Command<?> command() {
            return this.command;
        }

        public ActorRef sender() {
            return this.sender;
        }
    }

    private AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig, Collection<Closeable> collection) {
        this.killSwitch = KillSwitches.shared(getClass().getSimpleName());
        this.pubSubMediator = (ActorRef) ConditionChecker.checkNotNull(actorRef, "pub-sub mediator");
        this.entityType = (EntityType) ConditionChecker.checkNotNull(entityType, "entityType");
        if (namespacePersistenceOperations == null && entityPersistenceOperations == null) {
            throw new IllegalArgumentException("At least one of namespaceOps or entitiesOps must be specified.");
        }
        this.namespaceOps = namespacePersistenceOperations;
        this.entitiesOps = entityPersistenceOperations;
        this.toCloseWhenStopped = List.copyOf(collection);
        this.materializer = Materializer.createMaterializer(this::getContext);
        this.delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
        this.lastCommandsAndSender = new HashMap();
        this.logger = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    }

    protected AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig, Closeable closeable, Closeable... closeableArr) {
        this(actorRef, entityType, namespacePersistenceOperations, entityPersistenceOperations, persistenceOperationsConfig, toList(closeable, closeableArr));
    }

    private static List<Closeable> toList(Closeable closeable, Closeable... closeableArr) {
        ConditionChecker.checkNotNull(closeable, "Closeable");
        ConditionChecker.checkNotNull(closeableArr, "optional Closeables");
        ArrayList arrayList = new ArrayList(1 + closeableArr.length);
        arrayList.add(closeable);
        Collections.addAll(arrayList, closeableArr);
        return arrayList;
    }

    protected AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig) {
        this(actorRef, entityType, namespacePersistenceOperations, entityPersistenceOperations, persistenceOperationsConfig, Collections.emptyList());
    }

    protected abstract String getActorName();

    @Override // org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void preStart() {
        subscribeForNamespaceCommands();
        subscribeForEntitiesCommands();
        ActorRef self = getSelf();
        CoordinatedShutdown coordinatedShutdown = CoordinatedShutdown.get(getContext().getSystem());
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind(), "service-unbind-" + getActorName(), () -> {
            return Patterns.ask(getSelf(), AbstractActorWithShutdownBehavior.Control.SERVICE_UNBIND, SHUTDOWN_ASK_TIMEOUT).thenApply(obj -> {
                return Done.done();
            });
        });
        coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceRequestsDone(), "service-requests-done-" + getActorName(), () -> {
            return Patterns.ask(self, AbstractActorWithShutdownBehavior.Control.SERVICE_REQUESTS_DONE, SHUTDOWN_ASK_TIMEOUT).thenApply(obj -> {
                return Done.done();
            });
        });
    }

    @Override // org.apache.pekko.actor.AbstractActor, org.apache.pekko.actor.Actor
    public void postStop() throws Exception {
        this.toCloseWhenStopped.forEach(closeable -> {
            try {
                closeable.close();
            } catch (IOException e) {
                this.logger.warning("Failed to close: <{}>!", e.getMessage());
            }
        });
        this.lastCommandsAndSender.clear();
        super.postStop();
    }

    private void subscribeForNamespaceCommands() {
        if (null != this.namespaceOps) {
            this.logger.debug("Subscribing for namespace commands.");
            ActorRef self = getSelf();
            this.pubSubMediator.tell(DistPubSubAccess.subscribeViaGroup(PurgeNamespace.TYPE, getSubscribeGroup(), self), self);
        }
    }

    private void subscribeForEntitiesCommands() {
        if (null != this.entitiesOps) {
            ActorRef self = getSelf();
            String topic = PurgeEntities.getTopic(this.entityType);
            DistributedPubSubMediator.Subscribe subscribeViaGroup = DistPubSubAccess.subscribeViaGroup(topic, getSubscribeGroup(), self);
            this.logger.debug("Subscribing for entities commands on topic <{}>.", topic);
            this.pubSubMediator.tell(subscribeViaGroup, self);
        }
    }

    private String getSubscribeGroup() {
        return getSelf().path().toStringWithoutAddress();
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior
    public AbstractActor.Receive handleMessage() {
        return ReceiveBuilder.create().match(PurgeNamespace.class, this::purgeNamespace).match(PurgeEntities.class, this::purgeEntities).match(OpComplete.class, this::opComplete).match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck).matchAny(obj -> {
            this.logger.warning("Unhandled message: <{}>", obj);
        }).build();
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior
    public void serviceUnbind(AbstractActorWithShutdownBehavior.Control control) {
        this.logger.info("{}: unsubscribing from pubsub for {} actor", control, getActorName());
        ActorRef self = getSelf();
        Patterns.pipe(CompletableFuture.allOf(null != this.namespaceOps ? Patterns.ask(this.pubSubMediator, DistPubSubAccess.unsubscribeViaGroup(PurgeNamespace.TYPE, getSubscribeGroup(), self), SHUTDOWN_ASK_TIMEOUT).toCompletableFuture() : CompletableFuture.completedFuture(Done.getInstance()), null != this.entitiesOps ? Patterns.ask(this.pubSubMediator, DistPubSubAccess.unsubscribeViaGroup(PurgeEntities.getTopic(this.entityType), getSubscribeGroup(), self), SHUTDOWN_ASK_TIMEOUT).toCompletableFuture() : CompletableFuture.completedFuture(Done.getInstance())).thenApply(r5 -> {
            this.logger.info("Unsubscribed successfully from pubsub for {} actor", getActorName());
            return Done.getInstance();
        }), getContext().getDispatcher()).to(getSender());
    }

    @Override // org.eclipse.ditto.internal.utils.pekko.actors.AbstractActorWithShutdownBehavior
    public void serviceRequestsDone(AbstractActorWithShutdownBehavior.Control control) {
        this.logger.info("Re-schedule/Publish <{}> commands for <{}> via PubSub.", Integer.valueOf(this.lastCommandsAndSender.size()), getActorName());
        this.killSwitch.abort(KILL_SWITCH_EXCEPTION);
        this.lastCommandsAndSender.forEach((command, actorRef) -> {
            if (command instanceof PurgeNamespace) {
                PurgeNamespace purgeNamespace = (PurgeNamespace) command;
                this.pubSubMediator.tell(DistPubSubAccess.publishViaGroup(purgeNamespace.getType(), purgeNamespace), actorRef);
            } else if (command instanceof PurgeEntities) {
                PurgeEntities purgeEntities = (PurgeEntities) command;
                this.pubSubMediator.tell(DistPubSubAccess.publishViaGroup(PurgeEntities.getTopic(this.entityType), PurgeEntities.of(this.entityType, purgeEntities.getEntityIds(), purgeEntities.getDittoHeaders())), actorRef);
            }
        });
        getSender().tell(Done.getInstance(), getSelf());
    }

    private void purgeNamespace(PurgeNamespace purgeNamespace) {
        ThreadSafeDittoLoggingAdapter withCorrelationId = this.logger.withCorrelationId(purgeNamespace);
        if (null == this.namespaceOps) {
            withCorrelationId.warning("Cannot handle namespace command: <{}>!", purgeNamespace);
            return;
        }
        rememberCommandAndSender(purgeNamespace, getSender());
        withCorrelationId.info("Running <{}>.", purgeNamespace);
        String namespace = purgeNamespace.getNamespace();
        ActorRef sender = getSender();
        ((CompletionStage) this.namespaceOps.purge(purgeNamespace.getNamespace()).via(this.killSwitch.flow()).runWith(Sink.head(), this.materializer)).thenAccept(list -> {
            PurgeNamespaceResponse failed;
            if (list.isEmpty()) {
                withCorrelationId.info("Successfully purged namespace <{}>.", namespace);
                failed = PurgeNamespaceResponse.successful(namespace, this.entityType, purgeNamespace.getDittoHeaders());
            } else {
                list.forEach(th -> {
                    withCorrelationId.error(th, "Error purging namespace <{}>!", namespace);
                });
                failed = PurgeNamespaceResponse.failed(namespace, this.entityType, purgeNamespace.getDittoHeaders());
            }
            sender.tell(failed, getSelf());
            getSelf().tell(new OpComplete(purgeNamespace, sender), ActorRef.noSender());
        }).exceptionally(th -> {
            withCorrelationId.error(th, "Unexpected error when purging namespace <{}>!", purgeNamespace.getNamespace());
            getSelf().tell(new OpComplete(purgeNamespace, sender), ActorRef.noSender());
            return null;
        });
    }

    private void purgeEntities(PurgeEntities purgeEntities) {
        ThreadSafeDittoLoggingAdapter withCorrelationId = this.logger.withCorrelationId(purgeEntities);
        if (null == this.entitiesOps) {
            withCorrelationId.warning("Cannot handle entities command: <{}>.", purgeEntities);
        } else {
            if (!this.entityType.equals(purgeEntities.getEntityType())) {
                withCorrelationId.warning("Expected command with entityType <{}>, but got: <{}>.", this.entityType, purgeEntities);
                return;
            }
            rememberCommandAndSender(purgeEntities, getSender());
            shutDownPersistenceActorsOfEntitiesToPurge(purgeEntities);
            schedulePurgingEntitiesIn(this.delayAfterPersistenceActorShutdown, purgeEntities);
        }
    }

    private void shutDownPersistenceActorsOfEntitiesToPurge(PurgeEntities purgeEntities) {
        Shutdown shutdown = Shutdown.getInstance(ShutdownReasonFactory.getPurgeEntitiesReason(purgeEntities.getEntityIds()), purgeEntities.getDittoHeaders());
        this.pubSubMediator.tell(DistPubSubAccess.publish(shutdown.getType(), shutdown), getSelf());
    }

    private void schedulePurgingEntitiesIn(Duration duration, PurgeEntities purgeEntities) {
        ActorRef sender = getSender();
        getContext().system().scheduler().scheduleOnce(duration, () -> {
            doPurgeEntities(purgeEntities, sender);
        }, getContext().dispatcher());
    }

    private void doPurgeEntities(PurgeEntities purgeEntities, ActorRef actorRef) {
        ThreadSafeDittoLoggingAdapter withCorrelationId = this.logger.withCorrelationId(purgeEntities);
        if (null == this.entitiesOps) {
            withCorrelationId.warning("Cannot handle entities command: <{}>", purgeEntities);
            return;
        }
        withCorrelationId.info("Running <{}>.", purgeEntities);
        EntityType entityType = purgeEntities.getEntityType();
        List<EntityId> entityIds = purgeEntities.getEntityIds();
        ((CompletionStage) this.entitiesOps.purgeEntities(purgeEntities.getEntityIds()).via(this.killSwitch.flow()).runWith(Sink.head(), this.materializer)).thenAccept(list -> {
            PurgeEntitiesResponse failed;
            if (list.isEmpty()) {
                withCorrelationId.info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds);
                failed = PurgeEntitiesResponse.successful(entityType, purgeEntities.getDittoHeaders());
            } else {
                list.forEach(th -> {
                    withCorrelationId.error(th, "Error purging entities of type <{}>: <{}>", entityType, entityIds);
                });
                failed = PurgeEntitiesResponse.failed(entityType, purgeEntities.getDittoHeaders());
            }
            actorRef.tell(failed, getSelf());
            getSelf().tell(new OpComplete(purgeEntities, actorRef), ActorRef.noSender());
        }).exceptionally(th -> {
            withCorrelationId.error(th, "Unexpected error when purging entities <{}>!", purgeEntities.getEntityIds());
            getSelf().tell(new OpComplete(purgeEntities, actorRef), ActorRef.noSender());
            return null;
        });
    }

    private void rememberCommandAndSender(Command<?> command, ActorRef actorRef) {
        this.lastCommandsAndSender.put(command, actorRef);
    }

    private void opComplete(OpComplete opComplete) {
        this.logger.debug("Operation complete remove lastCommand {} and sender {} from map.", opComplete.command, opComplete.sender);
        this.lastCommandsAndSender.remove(opComplete.command, opComplete.sender);
    }

    private void handleSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.logger.debug("Got subscribeAck <{}>.", subscribeAck);
    }
}
