/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.SubscriptionTracker;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionTracker
implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionTracker.class);
    private static final MantisServerSubscriptionEnvelope DEFAULT_EMPTY_SUB_ENVELOPE = new MantisServerSubscriptionEnvelope(Collections.emptyList());
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final StreamManager streamManager;
    private final Counter fetchSubscriptionsSuccessCount;
    private final Counter staleSubscriptionsRemovedCount;
    private volatile Map<String, StreamSubscriptions> previousSubscriptions = new HashMap<String, StreamSubscriptions>();

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.streamManager = streamManager;
        this.fetchSubscriptionsSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "fetchSubscriptionsSuccessCount");
        this.staleSubscriptionsRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionsRemovedCount");
    }

    void propagateSubscriptionChanges(Set<MantisServerSubscription> prev2, Set<MantisServerSubscription> curr) {
        HashSet<MantisServerSubscription> prevSubsNotInCurr = new HashSet<MantisServerSubscription>(prev2);
        prevSubsNotInCurr.removeAll(curr);
        prevSubsNotInCurr.stream().forEach(subToRemove -> {
            Subscription subscription2 = SubscriptionFactory.getSubscription(subToRemove.getSubscriptionId(), subToRemove.getQuery());
            this.streamManager.removeStreamSubscription(subscription2);
        });
        HashSet<MantisServerSubscription> currSubsNotInPrev = new HashSet<MantisServerSubscription>(curr);
        currSubsNotInPrev.removeAll(prev2);
        currSubsNotInPrev.stream().forEach(subToAdd -> {
            Subscription subscription2 = SubscriptionFactory.getSubscription(subToAdd.getSubscriptionId(), subToAdd.getQuery());
            this.streamManager.addStreamSubscription(subscription2);
        });
    }

    private void cleanupStaleSubscriptions(String streamName) {
        StreamSubscriptions streamSubscriptions = this.previousSubscriptions.get(streamName);
        if (streamSubscriptions != null) {
            boolean hasStaleSubscriptionsData;
            boolean bl = hasStaleSubscriptionsData = System.currentTimeMillis() - streamSubscriptions.getCreateTimeMs() > (long)(this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000);
            if (hasStaleSubscriptionsData) {
                LOG.info("removing stale subscriptions data for stream {} ({} created {})", streamName, streamSubscriptions.getSubsEnvelope(), streamSubscriptions.getCreateTimeMs());
                this.staleSubscriptionsRemovedCount.increment();
                StreamSubscriptions removedSubs = this.previousSubscriptions.remove(streamName);
                this.propagateSubscriptionChanges(removedSubs.getSubsEnvelope().getSubscriptions(), Collections.emptySet());
            }
        }
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String var1, String var2);

    @Override
    public void refreshSubscriptions() {
        if (this.mrePublishConfiguration.isMREClientEnabled()) {
            for (Map.Entry<String, String> e2 : this.mrePublishConfiguration.streamNameToJobClusterMapping().entrySet()) {
                String jobCluster;
                String streamName = e2.getKey();
                Optional<MantisServerSubscriptionEnvelope> subsEnvelopeO = this.fetchSubscriptions(streamName, jobCluster = e2.getValue());
                if (subsEnvelopeO.isPresent()) {
                    MantisServerSubscriptionEnvelope subsEnvelope = subsEnvelopeO.get();
                    this.propagateSubscriptionChanges(this.previousSubscriptions.getOrDefault(streamName, new StreamSubscriptions(streamName, DEFAULT_EMPTY_SUB_ENVELOPE)).getSubsEnvelope().getSubscriptions(), subsEnvelope.getSubscriptions());
                    LOG.debug("{} subscriptions updated to {}", (Object)streamName, (Object)subsEnvelope);
                    this.previousSubscriptions.put(streamName, new StreamSubscriptions(streamName, subsEnvelope));
                    this.fetchSubscriptionsSuccessCount.increment();
                    continue;
                }
                this.cleanupStaleSubscriptions(streamName);
            }
        }
    }

    public Optional<MantisServerSubscriptionEnvelope> getCurrentSubs(String stream) {
        return Optional.ofNullable(this.previousSubscriptions.get(stream)).map(StreamSubscriptions::getSubsEnvelope);
    }

    static class StreamSubscriptions {
        private final String streamName;
        private final MantisServerSubscriptionEnvelope subsEnvelope;
        private final transient long createTimeMs;

        public StreamSubscriptions(String streamName, MantisServerSubscriptionEnvelope subsEnvelope) {
            this.streamName = streamName;
            this.subsEnvelope = subsEnvelope;
            this.createTimeMs = System.currentTimeMillis();
        }

        public String getStreamName() {
            return this.streamName;
        }

        public MantisServerSubscriptionEnvelope getSubsEnvelope() {
            return this.subsEnvelope;
        }

        public long getCreateTimeMs() {
            return this.createTimeMs;
        }
    }
}

