/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.SubscriptionTracker;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.core.SubscriptionFactory;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractSubscriptionTracker
implements SubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSubscriptionTracker.class);
    private final MrePublishConfiguration mrePublishConfiguration;
    private final Registry registry;
    private final MantisJobDiscovery jobDiscovery;
    private final StreamManager streamManager;
    private final Counter refreshSubscriptionInvokedCount;
    private final Counter refreshSubscriptionSuccessCount;
    private final Counter refreshSubscriptionFailedCount;
    private final Counter staleSubscriptionRemovedCount;
    private volatile Map<String, Long> streamLastFetchedTs = new HashMap<String, Long>();

    public AbstractSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery jobDiscovery, StreamManager streamManager) {
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.registry = registry;
        this.jobDiscovery = jobDiscovery;
        this.streamManager = streamManager;
        this.refreshSubscriptionInvokedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionInvokedCount");
        this.refreshSubscriptionSuccessCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionSuccessCount");
        this.refreshSubscriptionFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "refreshSubscriptionFailedCount");
        this.staleSubscriptionRemovedCount = SpectatorUtils.buildAndRegisterCounter(registry, "staleSubscriptionRemovedCount");
    }

    void propagateSubscriptionChanges(String streamName, Set<MantisServerSubscription> curr) {
        Set<String> previousIds = this.getCurrentSubIds(streamName);
        for (MantisServerSubscription newSub : curr) {
            block6: {
                if (!previousIds.contains(newSub.getSubscriptionId())) {
                    try {
                        Optional<Subscription> subscription = SubscriptionFactory.getSubscription(newSub.getSubscriptionId(), newSub.getQuery());
                        if (subscription.isPresent()) {
                            this.streamManager.addStreamSubscription(subscription.get());
                        } else {
                            LOG.info("will not add invalid subscription {}", (Object)newSub);
                        }
                    }
                    catch (Throwable t) {
                        if (!LOG.isDebugEnabled()) break block6;
                        LOG.debug("failed to add subscription {}", (Object)newSub, (Object)t);
                    }
                }
            }
            previousIds.remove(newSub.getSubscriptionId());
        }
        previousIds.stream().forEach(prevId -> {
            block2: {
                try {
                    this.streamManager.removeStreamSubscription((String)prevId);
                }
                catch (Throwable t) {
                    if (!LOG.isDebugEnabled()) break block2;
                    LOG.debug("failed to remove subscription {}", prevId);
                }
            }
        });
    }

    private void cleanupStaleSubscriptions(String streamName) {
        Long lastFetched = this.streamLastFetchedTs.get(streamName);
        if (lastFetched != null) {
            boolean hasStaleSubscriptionsData;
            boolean bl = hasStaleSubscriptionsData = System.currentTimeMillis() - lastFetched > (long)(this.mrePublishConfiguration.subscriptionExpiryIntervalSec() * 1000);
            if (hasStaleSubscriptionsData) {
                LOG.info("removing stale subscriptions data for stream {} (created {})", (Object)streamName, (Object)lastFetched);
                this.staleSubscriptionRemovedCount.increment();
                this.streamLastFetchedTs.remove(streamName);
                this.propagateSubscriptionChanges(streamName, Collections.emptySet());
            }
        }
    }

    public abstract Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String var1, String var2);

    @Override
    public void refreshSubscriptions() {
        this.refreshSubscriptionInvokedCount.increment();
        boolean mantisPublishEnabled = this.mrePublishConfiguration.isMREClientEnabled();
        Set<String> registeredStreams = this.streamManager.getRegisteredStreams();
        boolean subscriptionsFetchedForStream = false;
        if (mantisPublishEnabled && !registeredStreams.isEmpty()) {
            Map<String, String> streamJobClusterMap = this.jobDiscovery.getStreamNameToJobClusterMapping(this.mrePublishConfiguration.appName());
            for (Map.Entry<String, String> e : streamJobClusterMap.entrySet()) {
                String streamName = e.getKey();
                LOG.debug("processing stream {} and currently registered Streams {}", (Object)streamName, registeredStreams);
                if (registeredStreams.contains(streamName) || "__default__".equals(streamName)) {
                    subscriptionsFetchedForStream = true;
                    String jobCluster = e.getValue();
                    try {
                        Optional<MantisServerSubscriptionEnvelope> subsEnvelopeO = this.fetchSubscriptions(streamName, jobCluster);
                        if (subsEnvelopeO.isPresent()) {
                            MantisServerSubscriptionEnvelope subsEnvelope = subsEnvelopeO.get();
                            this.propagateSubscriptionChanges(streamName, subsEnvelope.getSubscriptions());
                            LOG.debug("{} subscriptions updated to {}", (Object)streamName, (Object)subsEnvelope);
                            this.streamLastFetchedTs.put(streamName, System.currentTimeMillis());
                            this.refreshSubscriptionSuccessCount.increment();
                            continue;
                        }
                        this.cleanupStaleSubscriptions(streamName);
                        this.refreshSubscriptionFailedCount.increment();
                    }
                    catch (Exception exc) {
                        LOG.info("refresh subscriptions failed for {} {}", new Object[]{streamName, jobCluster, exc});
                        this.refreshSubscriptionFailedCount.increment();
                    }
                    continue;
                }
                LOG.debug("will not fetch subscriptions for un-registered stream {}", (Object)streamName);
            }
            if (!subscriptionsFetchedForStream) {
                LOG.warn("No server side mappings found for one or more streams {} ", registeredStreams);
            }
        } else {
            LOG.debug("subscription refresh skipped (client enabled {} registered streams {})", (Object)mantisPublishEnabled, registeredStreams);
        }
    }

    protected Set<String> getCurrentSubIds(String streamName) {
        return this.streamManager.getStreamSubscriptions(streamName).stream().map(Subscription::getSubscriptionId).collect(Collectors.toSet());
    }
}

