package org.eclipse.ditto.services.utils.persistence.operations;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.japi.pf.ReceiveBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Sink;
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.model.base.entity.type.EntityType;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.signals.commands.common.Shutdown;
import org.eclipse.ditto.signals.commands.common.ShutdownReasonFactory;
import org.eclipse.ditto.signals.commands.common.purge.PurgeEntities;
import org.eclipse.ditto.signals.commands.common.purge.PurgeEntitiesResponse;
import org.eclipse.ditto.signals.commands.namespaces.PurgeNamespace;
import org.eclipse.ditto.signals.commands.namespaces.PurgeNamespaceResponse;

/* loaded from: input_file:org/eclipse/ditto/services/utils/persistence/operations/AbstractPersistenceOperationsActor.class */
public abstract class AbstractPersistenceOperationsActor extends AbstractActor {
    protected final DittoDiagnosticLoggingAdapter logger;
    private final ActorRef pubSubMediator;
    private final EntityType entityType;

    @Nullable
    private final NamespacePersistenceOperations namespaceOps;

    @Nullable
    private final EntityPersistenceOperations entitiesOps;
    private final ActorMaterializer materializer;
    private final Collection<Closeable> toCloseWhenStopped;
    private final Duration delayAfterPersistenceActorShutdown;

    private AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig, Collection<Closeable> collection) {
        this.pubSubMediator = (ActorRef) ConditionChecker.checkNotNull(actorRef, "pub-sub mediator");
        this.entityType = (EntityType) ConditionChecker.checkNotNull(entityType, "entityType");
        if (namespacePersistenceOperations == null && entityPersistenceOperations == null) {
            throw new IllegalArgumentException("At least one of namespaceOps or entitiesOps must be specified.");
        }
        this.namespaceOps = namespacePersistenceOperations;
        this.entitiesOps = entityPersistenceOperations;
        this.toCloseWhenStopped = Collections.unmodifiableCollection(collection);
        this.materializer = ActorMaterializer.create(getContext());
        this.delayAfterPersistenceActorShutdown = persistenceOperationsConfig.getDelayAfterPersistenceActorShutdown();
        this.logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
    }

    protected AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig, Closeable closeable, Closeable... closeableArr) {
        this(actorRef, entityType, namespacePersistenceOperations, entityPersistenceOperations, persistenceOperationsConfig, toList(closeable, closeableArr));
    }

    private static List<Closeable> toList(Closeable closeable, Closeable... closeableArr) {
        ConditionChecker.checkNotNull(closeable, "Closeable");
        ConditionChecker.checkNotNull(closeableArr, "optional Closeables");
        ArrayList arrayList = new ArrayList(1 + closeableArr.length);
        arrayList.add(closeable);
        Collections.addAll(arrayList, closeableArr);
        return arrayList;
    }

    protected AbstractPersistenceOperationsActor(ActorRef actorRef, EntityType entityType, @Nullable NamespacePersistenceOperations namespacePersistenceOperations, @Nullable EntityPersistenceOperations entityPersistenceOperations, PersistenceOperationsConfig persistenceOperationsConfig) {
        this(actorRef, entityType, namespacePersistenceOperations, entityPersistenceOperations, persistenceOperationsConfig, Collections.emptyList());
    }

    public void preStart() {
        subscribeForNamespaceCommands();
        subscribeForEntitiesCommands();
    }

    public void postStop() throws Exception {
        this.toCloseWhenStopped.forEach(closeable -> {
            try {
                closeable.close();
            } catch (IOException e) {
                this.logger.warning("Failed to close: <{}>!", e.getMessage());
            }
        });
        super.postStop();
    }

    private void subscribeForNamespaceCommands() {
        if (null != this.namespaceOps) {
            this.logger.debug("Subscribing for namespace commands.");
            ActorRef self = getSelf();
            this.pubSubMediator.tell(DistPubSubAccess.subscribeViaGroup("namespaces.commands:purgeNamespace", getSubscribeGroup(), self), self);
        }
    }

    private void subscribeForEntitiesCommands() {
        if (null != this.entitiesOps) {
            ActorRef self = getSelf();
            String topic = PurgeEntities.getTopic(this.entityType);
            DistributedPubSubMediator.Subscribe subscribeViaGroup = DistPubSubAccess.subscribeViaGroup(topic, getSubscribeGroup(), self);
            this.logger.debug("Subscribing for entities commands on topic <{}>.", topic);
            this.pubSubMediator.tell(subscribeViaGroup, self);
        }
    }

    private String getSubscribeGroup() {
        return getSelf().path().toStringWithoutAddress();
    }

    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(PurgeNamespace.class, this::purgeNamespace).match(PurgeEntities.class, this::purgeEntities).match(DistributedPubSubMediator.SubscribeAck.class, this::handleSubscribeAck).matchAny(obj -> {
            this.logger.warning("unhandled: <{}>", obj);
        }).build();
    }

    private void purgeNamespace(PurgeNamespace purgeNamespace) {
        if (null == this.namespaceOps) {
            this.logger.withCorrelationId(purgeNamespace).warning("Cannot handle namespace command: <{}>!", purgeNamespace);
            return;
        }
        this.logger.withCorrelationId(purgeNamespace).info("Running <{}>.", purgeNamespace);
        String namespace = purgeNamespace.getNamespace();
        ActorRef sender = getSender();
        ((CompletionStage) this.namespaceOps.purge(purgeNamespace.getNamespace()).runWith(Sink.head(), this.materializer)).thenAccept(list -> {
            PurgeNamespaceResponse failed;
            if (list.isEmpty()) {
                failed = PurgeNamespaceResponse.successful(namespace, this.entityType, purgeNamespace.getDittoHeaders());
            } else {
                this.logger.setCorrelationId(purgeNamespace);
                list.forEach(th -> {
                    this.logger.error(th, "Error purging namespace <{}>!", namespace);
                });
                this.logger.discardCorrelationId();
                failed = PurgeNamespaceResponse.failed(namespace, this.entityType, purgeNamespace.getDittoHeaders());
            }
            sender.tell(failed, getSelf());
            this.logger.withCorrelationId(purgeNamespace).info("Successfully purged namespace <{}>.", namespace);
        }).exceptionally(th -> {
            this.logger.withCorrelationId(purgeNamespace).error(th, "Unexpected error when purging namespace <{}>!", purgeNamespace.getNamespace());
            return null;
        });
    }

    private void purgeEntities(PurgeEntities purgeEntities) {
        if (null == this.entitiesOps) {
            this.logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>.", purgeEntities);
        } else if (!this.entityType.equals(purgeEntities.getEntityType())) {
            this.logger.withCorrelationId(purgeEntities).warning("Expected command with entityType <{}>, but got: <{}>.", this.entityType, purgeEntities);
        } else {
            shutDownPersistenceActorsOfEntitiesToPurge(purgeEntities);
            schedulePurgingEntitiesIn(this.delayAfterPersistenceActorShutdown, purgeEntities);
        }
    }

    private void shutDownPersistenceActorsOfEntitiesToPurge(PurgeEntities purgeEntities) {
        Shutdown shutdown = Shutdown.getInstance(ShutdownReasonFactory.getPurgeEntitiesReason(purgeEntities.getEntityIds()), purgeEntities.getDittoHeaders());
        this.pubSubMediator.tell(DistPubSubAccess.publish(shutdown.getType(), shutdown), getSelf());
    }

    private void schedulePurgingEntitiesIn(Duration duration, PurgeEntities purgeEntities) {
        ActorRef sender = getSender();
        getContext().system().scheduler().scheduleOnce(duration, () -> {
            doPurgeEntities(purgeEntities, sender);
        }, getContext().dispatcher());
    }

    private void doPurgeEntities(PurgeEntities purgeEntities, ActorRef actorRef) {
        if (null == this.entitiesOps) {
            this.logger.withCorrelationId(purgeEntities).warning("Cannot handle entities command: <{}>", purgeEntities);
            return;
        }
        this.logger.withCorrelationId(purgeEntities).info("Running <{}>.", purgeEntities);
        EntityType entityType = purgeEntities.getEntityType();
        List entityIds = purgeEntities.getEntityIds();
        ((CompletionStage) this.entitiesOps.purgeEntities(purgeEntities.getEntityIds()).runWith(Sink.head(), this.materializer)).thenAccept(list -> {
            PurgeEntitiesResponse failed;
            if (list.isEmpty()) {
                failed = PurgeEntitiesResponse.successful(entityType, purgeEntities.getDittoHeaders());
            } else {
                this.logger.setCorrelationId(purgeEntities);
                list.forEach(th -> {
                    this.logger.error(th, "Error purging entities of type <{}>: <{}>", entityType, entityIds);
                });
                this.logger.discardCorrelationId();
                failed = PurgeEntitiesResponse.failed(entityType, purgeEntities.getDittoHeaders());
            }
            actorRef.tell(failed, getSelf());
            this.logger.withCorrelationId(purgeEntities).info("Successfully purged entities of type <{}>: <{}>", entityType, entityIds);
        }).exceptionally(th -> {
            this.logger.withCorrelationId(purgeEntities).error(th, "Unexpected error when purging entities <{}>!", purgeEntities.getEntityIds());
            return null;
        });
    }

    private void handleSubscribeAck(DistributedPubSubMediator.SubscribeAck subscribeAck) {
        this.logger.debug("Got subscribeAck <{}>.", subscribeAck);
    }
}
