package org.eclipse.ditto.services.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.cluster.ddata.Replicator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.metrics.DittoMetrics;
import org.eclipse.ditto.services.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.Subscriptions;
import org.eclipse.ditto.services.utils.pubsub.ddata.SubscriptionsReader;

/* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater.class */
public final class SubUpdater<T> extends AbstractActorWithTimers {
    public static final String ACTOR_NAME_PREFIX = "subUpdater";
    private final PubSubConfig config;
    private final Subscriptions<T> subscriptions;
    private final DDataWriter<T> topicBloomFiltersWriter;
    private final ActorRef subscriber;
    private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
    private final Random random = new Random();
    private final Gauge topicMetric = DittoMetrics.gauge("pubsub-topics");
    private final Gauge awaitUpdateMetric = DittoMetrics.gauge("pubsub-await-update");
    private final Gauge awaitAcknowledgeMetric = DittoMetrics.gauge("pubsub-await-acknowledge");
    private final List<Acknowledgement> awaitUpdate = new ArrayList();
    private final List<Acknowledgement> awaitAcknowledge = new ArrayList();
    private Replicator.WriteConsistency nextWriteConsistency = Replicator.writeLocal();
    private boolean localSubscriptionsChanged = false;
    private State state = State.WAITING;

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$Acknowledgement.class */
    public static final class Acknowledgement {
        private final Request request;
        private final ActorRef sender;

        private Acknowledgement(Request request, ActorRef actorRef) {
            this.request = request;
            this.sender = actorRef;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Acknowledgement of(Request request, ActorRef actorRef) {
            return new Acknowledgement(request, actorRef);
        }

        public Request getRequest() {
            return this.request;
        }

        public ActorRef getSender() {
            return this.sender;
        }

        public String toString() {
            return getClass().getSimpleName() + "[request=" + this.request + ",sender=" + this.sender + "]";
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$Clock.class */
    private enum Clock {
        TICK
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$RemoveSubscriber.class */
    public static final class RemoveSubscriber extends Request {
        private RemoveSubscriber(ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            super(Collections.emptySet(), actorRef, writeConsistency, z);
        }

        public static RemoveSubscriber of(ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new RemoveSubscriber(actorRef, writeConsistency, z);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$Request.class */
    public static abstract class Request {
        private final Set<String> topics;
        private final ActorRef subscriber;
        private final Replicator.WriteConsistency writeConsistency;
        private final boolean acknowledge;

        private Request(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            this.topics = set;
            this.subscriber = actorRef;
            this.writeConsistency = writeConsistency;
            this.acknowledge = z;
        }

        public Set<String> getTopics() {
            return this.topics;
        }

        public ActorRef getSubscriber() {
            return this.subscriber;
        }

        public Replicator.WriteConsistency getWriteConsistency() {
            return this.writeConsistency;
        }

        public boolean shouldAcknowledge() {
            return this.acknowledge;
        }

        public String toString() {
            return getClass().getSimpleName() + "[topics=" + this.topics + ", subscriber=" + this.subscriber + ", writeConsistency=" + this.writeConsistency + ", acknowledge=" + this.acknowledge + "]";
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$State.class */
    private enum State {
        WAITING,
        UPDATING
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$Subscribe.class */
    public static final class Subscribe extends Request {
        private static final Predicate<Collection<String>> CONSTANT_TRUE = collection -> {
            return true;
        };
        private final Predicate<Collection<String>> filter;

        private Subscribe(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z, Predicate<Collection<String>> predicate) {
            super(set, actorRef, writeConsistency, z);
            this.filter = predicate;
        }

        public static Subscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new Subscribe(set, actorRef, writeConsistency, z, CONSTANT_TRUE);
        }

        public static Subscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z, Predicate<Collection<String>> predicate) {
            return new Subscribe(set, actorRef, writeConsistency, z, predicate);
        }

        public Predicate<Collection<String>> getFilter() {
            return this.filter;
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/services/utils/pubsub/actors/SubUpdater$Unsubscribe.class */
    public static final class Unsubscribe extends Request {
        private Unsubscribe(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            super(set, actorRef, writeConsistency, z);
        }

        public static Unsubscribe of(Set<String> set, ActorRef actorRef, Replicator.WriteConsistency writeConsistency, boolean z) {
            return new Unsubscribe(set, actorRef, writeConsistency, z);
        }
    }

    private SubUpdater(PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<T> subscriptions, DDataWriter<T> dDataWriter) {
        this.config = pubSubConfig;
        this.subscriber = actorRef;
        this.subscriptions = subscriptions;
        this.topicBloomFiltersWriter = dDataWriter;
        getTimers().startPeriodicTimer(Clock.TICK, Clock.TICK, pubSubConfig.getUpdateInterval());
    }

    public static <T> Props props(PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<T> subscriptions, DDataWriter<T> dDataWriter) {
        return Props.create((Class<?>) SubUpdater.class, pubSubConfig, actorRef, subscriptions, dDataWriter);
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Subscribe.class, this::subscribe).match(Unsubscribe.class, this::unsubscribe).match(Terminated.class, this::terminated).match(RemoveSubscriber.class, this::removeSubscriber).matchEquals(Clock.TICK, this::tick).match(SubscriptionsReader.class, this::updateSuccess).match(Status.Failure.class, this::updateFailure).matchAny(this::logUnhandled).build();
    }

    private void tick(Clock clock) {
        SubscriptionsReader snapshot;
        CompletionStage<Void> put;
        boolean forceUpdate = forceUpdate();
        if (!this.localSubscriptionsChanged && !forceUpdate) {
            moveAwaitUpdateToAwaitAcknowledge();
            flushAcknowledgements();
            return;
        }
        if (this.subscriptions.isEmpty()) {
            snapshot = this.subscriptions.snapshot();
            put = this.topicBloomFiltersWriter.removeSubscriber(this.subscriber, this.nextWriteConsistency);
            this.topicMetric.set(0L);
        } else {
            T export = this.subscriptions.export(forceUpdate);
            snapshot = this.subscriptions.snapshot();
            put = this.topicBloomFiltersWriter.put(this.subscriber, export, this.nextWriteConsistency);
            this.topicMetric.set(Long.valueOf(this.subscriptions.countTopics()));
        }
        put.handle(handleDDataWriteResult(snapshot));
        moveAwaitUpdateToAwaitAcknowledge();
        this.localSubscriptionsChanged = false;
        this.nextWriteConsistency = Replicator.writeLocal();
        this.state = State.UPDATING;
    }

    private boolean forceUpdate() {
        return this.random.nextDouble() < this.config.getForceUpdateProbability();
    }

    private void updateSuccess(SubscriptionsReader subscriptionsReader) {
        flushAcknowledgements();
        this.state = State.WAITING;
        this.subscriber.tell(subscriptionsReader, getSelf());
    }

    private void flushAcknowledgements() {
        for (Acknowledgement acknowledgement : this.awaitAcknowledge) {
            acknowledgement.getSender().tell(acknowledgement, getSelf());
        }
        this.awaitAcknowledge.clear();
        this.awaitAcknowledgeMetric.set(0L);
    }

    private void updateFailure(Status.Failure failure) {
        this.log.error(failure.cause(), "updateFailure");
        this.localSubscriptionsChanged = true;
        this.state = State.WAITING;
    }

    private void moveAwaitUpdateToAwaitAcknowledge() {
        this.awaitAcknowledge.addAll(this.awaitUpdate);
        this.awaitUpdate.clear();
        this.awaitAcknowledgeMetric.set(Long.valueOf(this.awaitAcknowledge.size()));
        this.awaitUpdateMetric.set(0L);
    }

    private BiFunction<Void, Throwable, Void> handleDDataWriteResult(SubscriptionsReader subscriptionsReader) {
        return (r7, th) -> {
            if (th == null) {
                getSelf().tell(subscriptionsReader, ActorRef.noSender());
            } else {
                getSelf().tell(new Status.Failure(th), ActorRef.noSender());
            }
            return r7;
        };
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private void subscribe(Subscribe subscribe) {
        boolean subscribe2 = this.subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter());
        enqueueRequest(subscribe, subscribe2);
        if (subscribe2) {
            getContext().watch(subscribe.getSubscriber());
        }
    }

    private void unsubscribe(Unsubscribe unsubscribe) {
        boolean unsubscribe2 = this.subscriptions.unsubscribe(unsubscribe.getSubscriber(), unsubscribe.getTopics());
        enqueueRequest(unsubscribe, unsubscribe2);
        if (!unsubscribe2 || this.subscriptions.contains(unsubscribe.getSubscriber())) {
            return;
        }
        getContext().unwatch(unsubscribe.getSubscriber());
    }

    private void terminated(Terminated terminated) {
        doRemoveSubscriber(terminated.actor());
    }

    private void removeSubscriber(RemoveSubscriber removeSubscriber) {
        doRemoveSubscriber(removeSubscriber.getSubscriber());
    }

    private void doRemoveSubscriber(ActorRef actorRef) {
        this.localSubscriptionsChanged |= this.subscriptions.removeSubscriber(actorRef);
    }

    private void enqueueRequest(Request request, boolean z) {
        this.localSubscriptionsChanged |= z;
        upgradeWriteConsistency(request.getWriteConsistency());
        if (request.shouldAcknowledge()) {
            this.awaitUpdate.add(Acknowledgement.of(request, getSender()));
            this.awaitUpdateMetric.increment();
        }
    }

    private void upgradeWriteConsistency(Replicator.WriteConsistency writeConsistency) {
        if (isMoreConsistent(writeConsistency, this.nextWriteConsistency)) {
            this.nextWriteConsistency = writeConsistency;
        }
    }

    private static boolean isMoreConsistent(Replicator.WriteConsistency writeConsistency, Replicator.WriteConsistency writeConsistency2) {
        return rank(writeConsistency) > rank(writeConsistency2);
    }

    private static int rank(Replicator.WriteConsistency writeConsistency) {
        if (Replicator.writeLocal().equals(writeConsistency)) {
            return Integer.MIN_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteAll) {
            return Integer.MAX_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteMajority) {
            return ((Replicator.WriteMajority) writeConsistency).minCap();
        }
        if (writeConsistency instanceof Replicator.WriteTo) {
            return ((Replicator.WriteTo) writeConsistency).n();
        }
        return 0;
    }
}
