package org.eclipse.ditto.internal.utils.pubsub.actors;

import akka.actor.AbstractActor;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.Terminated;
import akka.cluster.ddata.Replicator;
import akka.japi.pf.ReceiveBuilder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;
import java.util.stream.Stream;
import org.eclipse.ditto.base.model.acks.PubSubTerminatedException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.gauge.Gauge;
import org.eclipse.ditto.internal.utils.pubsub.api.RemoveSubscriber;
import org.eclipse.ditto.internal.utils.pubsub.api.Request;
import org.eclipse.ditto.internal.utils.pubsub.api.SubAck;
import org.eclipse.ditto.internal.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.internal.utils.pubsub.api.Unsubscribe;
import org.eclipse.ditto.internal.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataUpdate;
import org.eclipse.ditto.internal.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.internal.utils.pubsub.ddata.Subscriptions;
import org.eclipse.ditto.internal.utils.pubsub.ddata.SubscriptionsReader;
import org.eclipse.ditto.internal.utils.pubsub.ddata.compressed.CompressedDData;
import org.eclipse.ditto.internal.utils.pubsub.ddata.compressed.CompressedSubscriptions;
import org.eclipse.ditto.internal.utils.pubsub.ddata.literal.LiteralUpdate;

/* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater.class */
public final class SubUpdater extends AbstractActorWithTimers {
    public static final String ACTOR_NAME_PREFIX = "subUpdater";
    private final Subscriptions<LiteralUpdate> subscriptions;
    private final DDataWriter<ActorRef, LiteralUpdate> topicsWriter;
    private final ActorRef subscriber;
    private final Gauge topicSizeMetric;
    private final Gauge awaitUpdateMetric;
    private final Gauge awaitSubAckMetric;
    private final ThreadSafeDittoLoggingAdapter log = DittoLoggerFactory.getThreadSafeDittoLoggingAdapter(this);
    private final List<SubAck> awaitUpdate = new ArrayList();
    private final Queue<SubAck> awaitSubAck = new ArrayDeque();
    private Replicator.WriteConsistency nextWriteConsistency = writeLocal();
    private boolean localSubscriptionsChanged = false;
    private int seqNr = 0;
    private LiteralUpdate previousUpdate = LiteralUpdate.empty();

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater$Clock.class */
    private enum Clock {
        TICK
    }

    /* loaded from: input_file:org/eclipse/ditto/internal/utils/pubsub/actors/SubUpdater$DDataOpSuccess.class */
    private static final class DDataOpSuccess<P> {
        private final P payload;
        private final int seqNr;

        private DDataOpSuccess(P p, int i) {
            this.payload = p;
            this.seqNr = i;
        }
    }

    private SubUpdater(PubSubConfig pubSubConfig, ActorRef actorRef, Subscriptions<LiteralUpdate> subscriptions, DDataWriter<ActorRef, LiteralUpdate> dDataWriter) {
        this.subscriber = actorRef;
        this.subscriptions = subscriptions;
        this.topicsWriter = dDataWriter;
        String str = getContext().getParent().path().name() + "/subUpdater";
        this.topicSizeMetric = DittoMetrics.gauge("pubsub-topics-size-bytes").tag2("name", str);
        this.awaitUpdateMetric = DittoMetrics.gauge("pubsub-await-update").tag2("name", str);
        this.awaitSubAckMetric = DittoMetrics.gauge("pubsub-await-acknowledge").tag2("name", str);
        getTimers().startTimerAtFixedRate(Clock.TICK, Clock.TICK, pubSubConfig.getUpdateInterval());
    }

    public static Props props(PubSubConfig pubSubConfig, ActorRef actorRef, CompressedDData compressedDData) {
        return Props.create((Class<?>) SubUpdater.class, pubSubConfig, actorRef, CompressedSubscriptions.of(compressedDData.getSeeds()), compressedDData.getWriter());
    }

    @Override // akka.actor.AbstractActor
    public AbstractActor.Receive createReceive() {
        return ReceiveBuilder.create().match(Subscribe.class, this::subscribe).match(Unsubscribe.class, this::unsubscribe).match(Terminated.class, this::terminated).match(RemoveSubscriber.class, this::removeSubscriber).matchEquals(Clock.TICK, this::tick).match(DDataOpSuccess.class, this::ddataOpSuccess).match(Status.Failure.class, this::updateFailure).matchEquals(ActorEvent.PUBSUB_TERMINATED, this::pubSubTerminated).matchAny(this::logUnhandled).build();
    }

    private static boolean isMoreConsistent(Replicator.WriteConsistency writeConsistency, Replicator.WriteConsistency writeConsistency2) {
        return rank(writeConsistency) > rank(writeConsistency2);
    }

    private static int rank(Replicator.WriteConsistency writeConsistency) {
        if (writeLocal().equals(writeConsistency)) {
            return Integer.MIN_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteAll) {
            return Integer.MAX_VALUE;
        }
        if (writeConsistency instanceof Replicator.WriteMajority) {
            return ((Replicator.WriteMajority) writeConsistency).minCap();
        }
        if (writeConsistency instanceof Replicator.WriteTo) {
            return ((Replicator.WriteTo) writeConsistency).n();
        }
        return 0;
    }

    private static Replicator.WriteConsistency writeLocal() {
        return Replicator.writeLocal();
    }

    private void subscribe(Subscribe subscribe) {
        boolean subscribe2 = this.subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(), subscribe.getGroup().orElse(null));
        enqueueRequest(subscribe, subscribe2, getSender(), this.awaitUpdate, this.awaitUpdateMetric);
        if (subscribe2) {
            getContext().watch(subscribe.getSubscriber());
        }
    }

    private void unsubscribe(Unsubscribe unsubscribe) {
        boolean unsubscribe2 = this.subscriptions.unsubscribe(unsubscribe.getSubscriber(), unsubscribe.getTopics());
        enqueueRequest(unsubscribe, unsubscribe2, getSender(), this.awaitUpdate, this.awaitUpdateMetric);
        if (!unsubscribe2 || this.subscriptions.contains(unsubscribe.getSubscriber())) {
            return;
        }
        getContext().unwatch(unsubscribe.getSubscriber());
    }

    private void ddataOpSuccess(DDataOpSuccess<SubscriptionsReader> dDataOpSuccess) {
        flushSubAcks(((DDataOpSuccess) dDataOpSuccess).seqNr);
        this.subscriber.tell(((DDataOpSuccess) dDataOpSuccess).payload, getSelf());
        if (this.awaitSubAck.isEmpty() && this.awaitUpdate.isEmpty()) {
            this.localSubscriptionsChanged = false;
            this.nextWriteConsistency = writeLocal();
        }
    }

    private void tick(Clock clock) {
        performDDataOp(this.localSubscriptionsChanged, this.nextWriteConsistency).handle(handleDDataWriteResult(getSeqNr()));
        moveAwaitUpdateToAwaitAcknowledge();
    }

    private void flushSubAcks(int i) {
        for (SubAck subAck : exportAwaitSubAck(i)) {
            subAck.getSender().tell(subAck, getSelf());
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletionStage<SubscriptionsReader> performDDataOp(boolean z, Replicator.WriteConsistency writeConsistency) {
        SubscriptionsReader snapshot;
        CompletionStage<Void> put;
        if (!z) {
            snapshot = this.subscriptions.snapshot();
            put = CompletableFuture.completedStage(null);
        } else if (this.subscriptions.isEmpty()) {
            snapshot = this.subscriptions.snapshot();
            put = this.topicsWriter.removeSubscriber(this.subscriber, writeConsistency);
            this.previousUpdate = LiteralUpdate.empty();
            this.topicSizeMetric.set(0L);
        } else {
            LiteralUpdate export = this.subscriptions.export();
            snapshot = this.subscriptions.snapshot();
            put = this.topicsWriter.put(this.subscriber, export.diff2((DDataUpdate<String>) this.previousUpdate), writeConsistency);
            this.previousUpdate = export;
            this.topicSizeMetric.set(Long.valueOf(this.subscriptions.estimateSize()));
        }
        SubscriptionsReader subscriptionsReader = snapshot;
        return put.thenApply(r3 -> {
            return subscriptionsReader;
        });
    }

    private void updateFailure(Status.Failure failure) {
        this.log.error(failure.cause(), "Failure updating Ditto pub/sub subscription - trying again next clock tick");
        this.localSubscriptionsChanged = true;
    }

    private void enqueueRequest(Request request, boolean z, ActorRef actorRef, Collection<SubAck> collection, Gauge gauge) {
        this.localSubscriptionsChanged |= z;
        upgradeWriteConsistency(request.getWriteConsistency());
        if (request.shouldAcknowledge()) {
            int i = this.seqNr + 1;
            this.seqNr = i;
            collection.add(SubAck.of(request, actorRef, i));
            gauge.increment();
        }
    }

    private int getSeqNr() {
        return this.seqNr;
    }

    private List<SubAck> exportAwaitSubAck(int i) {
        ArrayList arrayList = new ArrayList(this.awaitSubAck.size());
        while (!this.awaitSubAck.isEmpty()) {
            SubAck poll = this.awaitSubAck.poll();
            arrayList.add(poll);
            if (poll.getSeqNr() == i) {
                break;
            }
        }
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        return Collections.unmodifiableList(arrayList);
    }

    private void moveAwaitUpdateToAwaitAcknowledge() {
        if (this.awaitUpdate.isEmpty()) {
            return;
        }
        this.awaitSubAck.addAll(this.awaitUpdate);
        this.awaitUpdate.clear();
        this.awaitSubAckMetric.set(Long.valueOf(this.awaitSubAck.size()));
        this.awaitUpdateMetric.set(0L);
    }

    private BiFunction<SubscriptionsReader, Throwable, Void> handleDDataWriteResult(int i) {
        return (subscriptionsReader, th) -> {
            if (th != null) {
                getSelf().tell(new Status.Failure(th), ActorRef.noSender());
                return null;
            }
            if (subscriptionsReader == null) {
                return null;
            }
            getSelf().tell(new DDataOpSuccess(subscriptionsReader, i), ActorRef.noSender());
            return null;
        };
    }

    private void logUnhandled(Object obj) {
        this.log.warning("Unhandled: <{}>", obj);
    }

    private void terminated(Terminated terminated) {
        doRemoveSubscriber(terminated.actor());
    }

    private void removeSubscriber(RemoveSubscriber removeSubscriber) {
        doRemoveSubscriber(removeSubscriber.getSubscriber());
    }

    private void doRemoveSubscriber(ActorRef actorRef) {
        this.localSubscriptionsChanged |= this.subscriptions.removeSubscriber(actorRef);
        getContext().unwatch(actorRef);
    }

    private void pubSubTerminated(ActorEvent actorEvent) {
        HashSet hashSet = new HashSet();
        Stream.concat(Stream.concat(this.awaitUpdate.stream(), this.awaitSubAck.stream()).map((v0) -> {
            return v0.getSender();
        }), this.subscriptions.getSubscribers().stream()).forEach(actorRef -> {
            if (hashSet.add(actorRef)) {
                actorRef.tell(PubSubTerminatedException.getInstance(), getSelf());
            }
        });
        this.subscriptions.clear();
        this.awaitUpdate.clear();
        this.awaitSubAck.clear();
        this.nextWriteConsistency = writeLocal();
    }

    private void upgradeWriteConsistency(Replicator.WriteConsistency writeConsistency) {
        if (isMoreConsistent(writeConsistency, this.nextWriteConsistency)) {
            this.nextWriteConsistency = writeConsistency;
        }
    }
}
