package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.cluster.ddata.Replicator;
import akka.japi.pf.ReceiveBuilder;
import javax.annotation.Nullable;
import org.eclipse.ditto.services.utils.pubsub.actors.AbstractUpdater;
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate;
import org.eclipse.ditto.services.utils.pubsub.extractors.PubSubTopicExtractor;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubSupervisor.class */
public final class SubSupervisor<T, U> extends AbstractPubSubSupervisor {
    private final Class<T> messageClass;
    private final PubSubTopicExtractor<T> topicExtractor;
    private final DData<?, U> topicsDData;

    @Nullable
    private final DData<String, LiteralUpdate> acksDData;

    @Nullable
    private ActorRef updater;

    @Nullable
    private ActorRef acksUpdater;

    private SubSupervisor(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, DData<?, U> dData, @Nullable DData<String, LiteralUpdate> dData2) {
        this.messageClass = cls;
        this.topicExtractor = pubSubTopicExtractor;
        this.topicsDData = dData;
        this.acksDData = dData2;
    }

    public static <T, U> Props props(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor, DData<?, U> dData, @Nullable DData<String, LiteralUpdate> dData2) {
        return Props.create((Class<?>) SubSupervisor.class, cls, pubSubTopicExtractor, dData, dData2);
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractPubSubSupervisor
    protected AbstractActor.Receive createPubSubBehavior() {
        return ReceiveBuilder.create().match(AbstractUpdater.DeclareAckLabels.class, this::isAcksUpdaterAvailable, this::declareAckLabels).match(AbstractUpdater.DeclareAckLabels.class, this::acksUpdaterUnavailable).match(AbstractUpdater.RemoveSubscriber.class, (v0) -> {
            return v0.isForAcknowledgementLabelDeclaration();
        }, this::removeAcknowledgementLabelDeclaration).match(AbstractUpdater.Request.class, this::isUpdaterAvailable, this::request).match(AbstractUpdater.Request.class, this::updaterUnavailable).match(Terminated.class, this::subscriberTerminated).build();
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractPubSubSupervisor
    protected void onChildFailure() {
        this.updater = null;
        this.log.error("All local subscriptions lost.");
    }

    @Override // org.eclipse.ditto.services.utils.pubsub.actors.AbstractPubSubSupervisor
    protected void startChildren() {
        ActorRef startChild = startChild(Subscriber.props(this.messageClass, this.topicExtractor), Subscriber.ACTOR_NAME_PREFIX);
        getContext().watch(startChild);
        this.updater = startChild(SubUpdater.props(this.config, startChild, this.topicsDData), SubUpdater.ACTOR_NAME_PREFIX);
        if (this.acksDData != null) {
            this.acksUpdater = startChild(AcksUpdater.props(this.config, startChild, this.acksDData), AcksUpdater.ACTOR_NAME_PREFIX);
        }
    }

    private void subscriberTerminated(Terminated terminated) {
        this.log.error("Subscriber terminated. Removing subscriber from DData: <{}>", terminated.getActor());
        this.topicsDData.getWriter().removeSubscriber(terminated.getActor(), Replicator.writeLocal());
        if (this.acksDData != null) {
            this.acksDData.getWriter().removeSubscriber(terminated.getActor(), Replicator.writeLocal());
        }
    }

    private boolean isUpdaterAvailable() {
        return this.updater != null;
    }

    private void request(AbstractUpdater.Request request) {
        this.updater.tell(request, getSender());
    }

    private boolean isAcksUpdaterAvailable() {
        return this.acksUpdater != null;
    }

    private void declareAckLabels(AbstractUpdater.DeclareAckLabels declareAckLabels) {
        this.acksUpdater.tell(declareAckLabels.toSubscribe(), getSender());
    }

    private void updaterUnavailable(AbstractUpdater.Request request) {
        this.log.error("SubUpdater unavailable. Dropping <{}>", request);
        getSender().tell(new IllegalStateException("AcksUpdater not available"), getSelf());
    }

    private void acksUpdaterUnavailable(AbstractUpdater.DeclareAckLabels declareAckLabels) {
        this.log.error("AcksUpdater unavailable. Failing <{}>", declareAckLabels);
        getSender().tell(new IllegalStateException("AcksUpdater not available"), getSelf());
    }

    private void removeAcknowledgementLabelDeclaration(AbstractUpdater.RemoveSubscriber removeSubscriber) {
        if (isAcksUpdaterAvailable()) {
            this.acksUpdater.tell(removeSubscriber, getSender());
        } else {
            updaterUnavailable(removeSubscriber);
        }
    }
}
