package org.eclipse.ditto.services.concierge.batch.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorKilledException;
import akka.actor.ActorRef;
import akka.actor.OneForOneStrategy;
import akka.actor.Props;
import akka.actor.SupervisorStrategy;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.persistence.AbstractPersistentActor;
import akka.persistence.RecoveryCompleted;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import akka.persistence.SelectedSnapshot;
import akka.persistence.SnapshotOffer;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.bson.BsonValue;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.persistence.SnapshotAdapter;
import org.eclipse.ditto.services.utils.persistence.mongo.DittoBsonJson;
import org.eclipse.ditto.signals.commands.batch.ExecuteBatch;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.batch.BatchExecutionFinished;
import org.eclipse.ditto.signals.events.batch.BatchExecutionStarted;
import scala.Option;

/* loaded from: input_file:org/eclipse/ditto/services/concierge/batch/actors/BatchSupervisorActor.class */
public final class BatchSupervisorActor extends AbstractPersistentActor {
    public static final String ACTOR_NAME = "batchSupervisor";
    private static final String JOURNAL_PLUGIN_ID = "akka-contrib-mongodb-persistence-batch-supervisor-journal";
    private static final String SNAPSHOT_PLUGIN_ID = "akka-contrib-mongodb-persistence-batch-supervisor-snapshots";
    private static final int SNAPSHOT_THRESHOLD = 1000;
    private final ActorRef pubSubMediator;
    private final ActorRef conciergeForwarder;
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private long snapshotSequenceNr = -1;
    private final SnapshotAdapter<Set<String>> snapshotAdapter = new BatchIdsSnapshotAdapter();
    private Set<String> batchIds = new HashSet();

    /* loaded from: input_file:org/eclipse/ditto/services/concierge/batch/actors/BatchSupervisorActor$BatchIdsSnapshotAdapter.class */
    static final class BatchIdsSnapshotAdapter implements SnapshotAdapter<Set<String>> {
        BatchIdsSnapshotAdapter() {
        }

        @Override // org.eclipse.ditto.services.utils.persistence.SnapshotAdapter
        public Object toSnapshotStore(Set<String> set) {
            return DittoBsonJson.getInstance().parse((JsonArray) set.stream().map(JsonValue::of).collect(JsonCollectors.valuesToArray()));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.eclipse.ditto.services.utils.persistence.SnapshotAdapter
        public Set<String> fromSnapshotStore(SnapshotOffer snapshotOffer) {
            Object snapshot = snapshotOffer.snapshot();
            if (snapshot instanceof BsonValue) {
                return (Set) JsonFactory.newArray(DittoBsonJson.getInstance().serialize((BsonValue) snapshot).toString()).stream().map((v0) -> {
                    return v0.asString();
                }).collect(Collectors.toSet());
            }
            throw new IllegalArgumentException("Unable to fromSnapshotStore a non-'BsonValue' object! Was: " + snapshot.getClass());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.eclipse.ditto.services.utils.persistence.SnapshotAdapter
        @Nullable
        public Set<String> fromSnapshotStore(SelectedSnapshot selectedSnapshot) {
            return null;
        }
    }

    private BatchSupervisorActor(ActorRef actorRef, ActorRef actorRef2) {
        this.pubSubMediator = actorRef;
        this.conciergeForwarder = actorRef2;
    }

    public static Props props(ActorRef actorRef, ActorRef actorRef2) {
        return Props.create((Class<?>) BatchSupervisorActor.class, actorRef, actorRef2);
    }

    @Override // akka.persistence.PersistenceIdentity
    public String persistenceId() {
        return ACTOR_NAME;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.PersistenceIdentity
    public String journalPluginId() {
        return JOURNAL_PLUGIN_ID;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.PersistenceIdentity
    public String snapshotPluginId() {
        return SNAPSHOT_PLUGIN_ID;
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.persistence.AbstractPersistentActorLike
    public AbstractActor.Receive createReceiveRecover() {
        return ReceiveBuilder.create().match(SnapshotOffer.class, snapshotOffer -> {
            this.batchIds = this.snapshotAdapter.fromSnapshotStore(snapshotOffer);
            this.snapshotSequenceNr = snapshotOffer.metadata().sequenceNr();
        }).match(BatchExecutionStarted.class, batchExecutionStarted -> {
            this.batchIds.add(batchExecutionStarted.getBatchId());
        }).match(BatchExecutionFinished.class, batchExecutionFinished -> {
            this.batchIds.remove(batchExecutionFinished.getBatchId());
        }).match(RecoveryCompleted.class, recoveryCompleted -> {
            this.log.debug("Recovery completed");
            this.batchIds.forEach(this::lookupBatchCoordinatorActor);
        }).matchAny(obj -> {
            this.log.warning("Unknown recover message: {}", obj);
        }).build();
    }

    @Override // akka.persistence.AbstractPersistentActor, akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(ExecuteBatch.class, this::forwardCommand).match(BatchExecutionStarted.class, batchExecutionStarted -> {
            persistEvent(batchExecutionStarted, batchExecutionStarted -> {
                if (this.batchIds.contains(batchExecutionStarted.getBatchId())) {
                    return;
                }
                this.batchIds.add(batchExecutionStarted.getBatchId());
                publishEvent(batchExecutionStarted);
            });
        }).match(BatchExecutionFinished.class, batchExecutionFinished -> {
            persistEvent(batchExecutionFinished, batchExecutionFinished -> {
                if (this.batchIds.contains(batchExecutionFinished.getBatchId())) {
                    this.batchIds.remove(batchExecutionFinished.getBatchId());
                    publishEvent(batchExecutionFinished);
                }
            });
        }).match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck -> {
            this.log.debug("Successfully subscribed to distributed pub/sub on topic '{}'", subscribeAck.subscribe().topic());
        }).match(SaveSnapshotSuccess.class, saveSnapshotSuccess -> {
            this.snapshotSequenceNr = saveSnapshotSuccess.metadata().sequenceNr();
            deleteMessages(this.snapshotSequenceNr - 1);
        }).match(SaveSnapshotFailure.class, saveSnapshotFailure -> {
            this.log.error(saveSnapshotFailure.cause(), "Failed to save Snapshot. Cause: {}.", saveSnapshotFailure.cause().getMessage());
        }).matchAny(obj -> {
            this.log.warning("Got unknown message, expected an 'ExecuteBatch' command: {}", obj);
        }).build();
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public void preStart() {
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(ExecuteBatch.TYPE, ACTOR_NAME, getSelf()), getSelf());
        this.pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(BatchExecutionFinished.TYPE, ACTOR_NAME, getSelf()), getSelf());
    }

    @Override // akka.actor.AbstractActor, akka.actor.Actor
    public SupervisorStrategy supervisorStrategy() {
        return new OneForOneStrategy(true, DeciderBuilder.match(NullPointerException.class, nullPointerException -> {
            return SupervisorStrategy.restart();
        }).match(ActorKilledException.class, actorKilledException -> {
            return SupervisorStrategy.stop();
        }).matchAny(th -> {
            return SupervisorStrategy.escalate();
        }).build());
    }

    private void publishEvent(Event event) {
        this.pubSubMediator.tell(new DistributedPubSubMediator.Publish(event.getType(), event, true), getSelf());
    }

    private void forwardCommand(ExecuteBatch executeBatch) {
        lookupBatchCoordinatorActor(executeBatch.getDittoHeaders().getCorrelationId().orElse(UUID.randomUUID().toString())).forward(executeBatch, getContext());
    }

    private ActorRef lookupBatchCoordinatorActor(String str) {
        Option<ActorRef> child = getContext().child("batch-coordinator-" + str);
        if (child.isDefined()) {
            return child.get();
        }
        return getContext().actorOf(BatchCoordinatorActor.props(str, getSelf(), this.conciergeForwarder), "batch-coordinator-" + str);
    }

    private <E extends Event> void persistEvent(E e, Consumer<E> consumer) {
        this.log.debug("Persisting Event '{}'", e.getType());
        persist(e, event -> {
            this.log.debug("Successfully persisted Event '{}'", e.getType());
            if (lastSequenceNr() - this.snapshotSequenceNr > 1000) {
                saveSnapshot(this.snapshotAdapter.toSnapshotStore(this.batchIds));
            }
            consumer.accept(event);
        });
    }
}
