package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataReader;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher.class */
public final class Publisher<T> extends AbstractActor {
    public static final String ACTOR_NAME_PREFIX = "publisher";
    private final DDataReader<T> ddataReader;
    private DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private final Counter messageCounter = DittoMetrics.counter("pubsub-published-messages");
    private final Counter topicCounter = DittoMetrics.counter("pubsub-published-topics");
    private CompletionStage<Void> currentPublication = CompletableFuture.completedFuture(null);

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/Publisher$Publish.class */
    public static final class Publish {
        private final Collection<String> topics;
        private final Object message;

        private Publish(Collection<String> collection, Object obj) {
            this.topics = collection;
            this.message = obj;
        }

        public static Publish of(Collection<String> collection, Object obj) {
            return new Publish(collection, obj);
        }

        public Collection<String> getTopics() {
            return this.topics;
        }

        public Object getMessage() {
            return this.message;
        }
    }

    private Publisher(DDataReader<T> dDataReader) {
        this.ddataReader = dDataReader;
    }

    public static <T> Props props(DDataReader<T> dDataReader) {
        return Props.create((Class<?>) Publisher.class, dDataReader);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Publish.class, this::publish).matchAny(this::logUnhandled).build();
    }

    private void publish(Publish publish) {
        this.messageCounter.increment();
        this.topicCounter.increment(publish.getTopics().size());
        Stream<String> stream = publish.getTopics().stream();
        DDataReader<T> dDataReader = this.ddataReader;
        Objects.requireNonNull(dDataReader);
        List list = (List) stream.map(dDataReader::approximate).collect(Collectors.toList());
        Object message = publish.getMessage();
        ActorRef sender = getSender();
        this.currentPublication = this.currentPublication.thenCompose(r9 -> {
            return this.ddataReader.getSubscribers(list).thenAccept(collection -> {
                collection.forEach(actorRef -> {
                    actorRef.tell(message, sender);
                });
            }).exceptionally(th -> {
                this.log.error(th, "Failed: <{}>", publish);
                return null;
            });
        });
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }
}
