/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.SubscriptionTracker;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionTracker
implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionTracker.class);
    private static final MantisServerSubscriptionEnvelope DEFAULT_EMPTY_SUB_ENVELOPE = new MantisServerSubscriptionEnvelope(Collections.emptyList());
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final StreamManager streamManager;
    private final Counter refreshSubscriptionSuccessCount;
    private final Counter refreshSubscriptionFailedCount;
    private final Counter staleSubscriptionRemovedCount;
    private volatile Map<String, StreamSubscriptions> previousSubscriptions = new HashMap<String, StreamSubscriptions>();

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.streamManager = streamManager;
        this.refreshSubscriptionSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionSuccessCount");
        this.refreshSubscriptionFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionFailedCount");
        this.staleSubscriptionRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionRemovedCount");
    }

    void propagateSubscriptionChanges(Set<MantisServerSubscription> prev, Set<MantisServerSubscription> curr) {
        HashSet<MantisServerSubscription> prevSubsNotInCurr = new HashSet<MantisServerSubscription>(prev);
        prevSubsNotInCurr.removeAll(curr);
        prevSubsNotInCurr.stream().forEach(subToRemove -> {
            Optional<Subscription> subscription = SubscriptionFactory.getSubscription(subToRemove.getSubscriptionId(), subToRemove.getQuery());
            if (subscription.isPresent()) {
                this.streamManager.removeStreamSubscription(subscription.get());
            } else {
                LOG.warn("unexpected to find invalid subscription to remove {}", subToRemove);
            }
        });
        HashSet<MantisServerSubscription> currSubsNotInPrev = new HashSet<MantisServerSubscription>(curr);
        currSubsNotInPrev.removeAll(prev);
        currSubsNotInPrev.stream().forEach(subToAdd -> {
            Optional<Subscription> subscription = SubscriptionFactory.getSubscription(subToAdd.getSubscriptionId(), subToAdd.getQuery());
            if (subscription.isPresent()) {
                this.streamManager.addStreamSubscription(subscription.get());
            } else {
                LOG.info("will not add invalid subscription {}", subToAdd);
            }
        });
    }

    private void cleanupStaleSubscriptions(String streamName) {
        StreamSubscriptions streamSubscriptions = this.previousSubscriptions.get(streamName);
        if (streamSubscriptions != null) {
            boolean hasStaleSubscriptionsData;
            boolean bl = hasStaleSubscriptionsData = System.currentTimeMillis() - streamSubscriptions.getCreateTimeMs() > (long)(this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000);
            if (hasStaleSubscriptionsData) {
                LOG.info("removing stale subscriptions data for stream {} ({} created {})", new Object[]{streamName, streamSubscriptions.getSubsEnvelope(), streamSubscriptions.getCreateTimeMs()});
                this.staleSubscriptionRemovedCount.increment();
                StreamSubscriptions removedSubs = this.previousSubscriptions.remove(streamName);
                this.propagateSubscriptionChanges(removedSubs.getSubsEnvelope().getSubscriptions(), Collections.emptySet());
            }
        }
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String var1, String var2);

    @Override
    public void refreshSubscriptions() {
        if (this.mrePublishConfiguration.isMREClientEnabled() && !this.streamManager.getRegisteredStreams().isEmpty()) {
            for (Map.Entry<String, String> e : this.mrePublishConfiguration.streamNameToJobClusterMapping().entrySet()) {
                String streamName = e.getKey();
                if (this.streamManager.getRegisteredStreams().contains(streamName) || "__default__".equals(streamName)) {
                    String jobCluster = e.getValue();
                    try {
                        Optional<MantisServerSubscriptionEnvelope> subsEnvelopeO = this.fetchSubscriptions(streamName, jobCluster);
                        if (subsEnvelopeO.isPresent()) {
                            MantisServerSubscriptionEnvelope subsEnvelope = subsEnvelopeO.get();
                            this.propagateSubscriptionChanges(this.previousSubscriptions.getOrDefault(streamName, new StreamSubscriptions(streamName, DEFAULT_EMPTY_SUB_ENVELOPE)).getSubsEnvelope().getSubscriptions(), subsEnvelope.getSubscriptions());
                            LOG.debug("{} subscriptions updated to {}", (Object)streamName, (Object)subsEnvelope);
                            this.previousSubscriptions.put(streamName, new StreamSubscriptions(streamName, subsEnvelope));
                            this.refreshSubscriptionSuccessCount.increment();
                            continue;
                        }
                        this.cleanupStaleSubscriptions(streamName);
                        this.refreshSubscriptionFailedCount.increment();
                    }
                    catch (Exception exc) {
                        LOG.info("refresh subscriptions failed for {} {}", new Object[]{streamName, jobCluster, exc});
                        this.refreshSubscriptionFailedCount.increment();
                    }
                    continue;
                }
                LOG.debug("will not fetch subscriptions for un-registered stream {}", (Object)streamName);
            }
        }
    }

    public Optional<MantisServerSubscriptionEnvelope> getCurrentSubs(String stream) {
        return Optional.ofNullable(this.previousSubscriptions.get(stream)).map(StreamSubscriptions::getSubsEnvelope);
    }

    static class StreamSubscriptions {
        private final String streamName;
        private final MantisServerSubscriptionEnvelope subsEnvelope;
        private final transient long createTimeMs;

        public StreamSubscriptions(String streamName, MantisServerSubscriptionEnvelope subsEnvelope) {
            this.streamName = streamName;
            this.subsEnvelope = subsEnvelope;
            this.createTimeMs = System.currentTimeMillis();
        }

        public String getStreamName() {
            return this.streamName;
        }

        public MantisServerSubscriptionEnvelope getSubsEnvelope() {
            return this.subsEnvelope;
        }

        public long getCreateTimeMs() {
            return this.createTimeMs;
        }
    }
}

