package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import java.util.Iterator;
import java.util.Set;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.services.utils.pubsub.extractors.PubSubTopicExtractor;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Subscriber.class */
public final class Subscriber<T> extends AbstractActor {
    public static final String ACTOR_NAME_PREFIX = "subscriber";
    private final Class<T> messageClass;
    private final PubSubTopicExtractor<T> topicExtractor;
    private SubscriptionsReader localSubscriptions = SubscriptionsReader.empty();
    private Counter truePositiveCounter = DittoMetrics.counter("pubsub-true-positive");
    private Counter falsePositiveCounter = DittoMetrics.counter("pubsub-false-positive");

    private Subscriber(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor) {
        this.messageClass = cls;
        this.topicExtractor = pubSubTopicExtractor;
    }

    public static <T> Props props(Class<T> cls, PubSubTopicExtractor<T> pubSubTopicExtractor) {
        return Props.create((Class<?>) Subscriber.class, cls, pubSubTopicExtractor);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(this.messageClass, this::broadcastToLocalSubscribers).match(SubscriptionsReader.class, this::updateLocalSubscriptions).build();
    }

    private void broadcastToLocalSubscribers(T t) {
        Set<ActorRef> subscribers = this.localSubscriptions.getSubscribers(this.topicExtractor.getTopics(t));
        if (subscribers.isEmpty()) {
            this.falsePositiveCounter.increment();
            return;
        }
        this.truePositiveCounter.increment();
        Iterator<ActorRef> it = subscribers.iterator();
        while (it.hasNext()) {
            it.next().tell(t, getSender());
        }
    }

    private void updateLocalSubscriptions(SubscriptionsReader subscriptionsReader) {
        this.localSubscriptions = subscriptionsReader;
    }
}
