/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.core.Subscription;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.internal.metrics.StreamMetrics;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamManager {
    private static final Logger LOG = LoggerFactory.getLogger(StreamManager.class);
    private final Registry registry;
    private final MrePublishConfiguration config;
    private final Counter mantisStreamCreateFailed;
    private final ConcurrentMap<String, ConcurrentSkipListSet<Subscription>> streamSubscriptionsMap;
    private final ConcurrentMap<String, List<String>> subscriptionIdToStreamsMap;
    private final ConcurrentMap<String, BlockingQueue<Event>> streamQueuesMap;
    private final ConcurrentMap<String, StreamMetrics> streamMetricsMap;

    public StreamManager(Registry registry, MrePublishConfiguration mrePublishConfiguration) {
        this.registry = registry;
        this.config = mrePublishConfiguration;
        this.mantisStreamCreateFailed = SpectatorUtils.buildAndRegisterCounter(this.registry, "mantisStreamCreateFailed");
        this.streamSubscriptionsMap = new ConcurrentHashMap<String, ConcurrentSkipListSet<Subscription>>();
        this.subscriptionIdToStreamsMap = new ConcurrentHashMap<String, List<String>>();
        this.streamQueuesMap = new ConcurrentHashMap<String, BlockingQueue<Event>>();
        this.streamMetricsMap = new ConcurrentHashMap<String, StreamMetrics>();
    }

    synchronized Optional<BlockingQueue<Event>> createIfAbsentQueueForStream(String streamName) {
        if (!this.streamQueuesMap.containsKey(streamName)) {
            this.cleanupInactiveStreamQueues();
            if (this.streamQueuesMap.keySet().size() >= this.config.maxNumStreams()) {
                LOG.debug("failed to create queue for stream {} (MAX_NUM_STREAMS {} exceeded)", (Object)streamName, (Object)this.config.maxNumStreams());
                this.mantisStreamCreateFailed.increment();
                return Optional.empty();
            }
            int qSize = this.config.streamQueueSize(streamName);
            LOG.info("creating queue for stream {} (size: {})", (Object)streamName, (Object)qSize);
            this.streamQueuesMap.putIfAbsent(streamName, new LinkedBlockingQueue(qSize));
            this.streamMetricsMap.putIfAbsent(streamName, new StreamMetrics(this.registry, streamName));
        }
        return Optional.ofNullable(this.streamQueuesMap.get(streamName));
    }

    private boolean isStreamInactive(long lastEventTs) {
        long secondsSinceLastEvent = TimeUnit.SECONDS.convert(System.nanoTime() - lastEventTs, TimeUnit.NANOSECONDS);
        return secondsSinceLastEvent > this.config.streamInactiveDurationThreshold();
    }

    private void cleanupInactiveStreamQueues() {
        ArrayList streamsToRemove = new ArrayList(5);
        this.streamQueuesMap.keySet().stream().forEach(streamName -> this.getStreamMetrics((String)streamName).ifPresent(m -> {
            long lastEventTs = m.getLastEventOnStreamTimestamp();
            if (lastEventTs != 0L && this.isStreamInactive(lastEventTs)) {
                streamsToRemove.add(streamName);
            }
        }));
        streamsToRemove.stream().forEach(stream -> {
            this.streamQueuesMap.remove(stream);
            this.streamMetricsMap.remove(stream);
        });
    }

    Optional<BlockingQueue<Event>> getQueueForStream(String streamName) {
        return Optional.ofNullable(this.streamQueuesMap.get(streamName));
    }

    Optional<StreamMetrics> getStreamMetrics(String streamName) {
        return Optional.ofNullable(this.streamMetricsMap.get(streamName));
    }

    boolean hasSubscriptions(String streamName) {
        return Optional.ofNullable(this.streamSubscriptionsMap.get(streamName)).map(subs2 -> !subs2.isEmpty()).orElse(false);
    }

    private List<String> sanitizeStreamSubjects(List<String> subjects) {
        return subjects.stream().map(s -> {
            if (s.toLowerCase().equals("observable") || s.toLowerCase().equals("stream") || s.equals("__default__")) {
                return "defaultStream";
            }
            return s;
        }).collect(Collectors.toList());
    }

    private void handleDuplicateSubscriptionId(Subscription sub) {
        String subId = sub.getSubscriptionId();
        Optional.ofNullable(this.subscriptionIdToStreamsMap.get(subId)).ifPresent(streams -> this.removeSubscriptionId((List<String>)streams, subId));
    }

    synchronized void addStreamSubscription(Subscription sub) {
        List<String> streams = this.sanitizeStreamSubjects(sub.getSubjects());
        LOG.info("adding subscription {} with streams {}", (Object)sub, (Object)streams);
        this.handleDuplicateSubscriptionId(sub);
        for (String stream : streams) {
            this.streamSubscriptionsMap.putIfAbsent(stream, new ConcurrentSkipListSet());
            ConcurrentSkipListSet subs2 = (ConcurrentSkipListSet)this.streamSubscriptionsMap.get(stream);
            int maxSubs = this.config.maxSubscriptions(stream);
            subs2.removeIf(s -> s.getSubscriptionId().equals(sub.getSubscriptionId()));
            subs2.add(sub);
            int numSubs = subs2.size();
            if (numSubs > maxSubs) {
                this.removeStreamSubscription(sub);
                LOG.warn("QUERY SUBSCRIPTION REJECTED: Number of subscriptions for stream {} exceeded max {}. Increase (default mre.publish.max.subscriptions.per.stream.default or  mre.publish.max.subscriptions.stream.<streamName>) to allow more queries. Removed {}", stream, maxSubs, sub);
                this.getStreamMetrics(stream).ifPresent(m -> m.getMantisQueryRejectedCounter().increment());
            }
            this.getStreamMetrics(stream).ifPresent(m -> m.getMantisActiveQueryCountGauge().set(numSubs));
        }
        this.subscriptionIdToStreamsMap.put(sub.getSubscriptionId(), streams);
    }

    private void removeSubscriptionId(List<String> streams, String subscriptionId) {
        for (String stream : streams) {
            ConcurrentSkipListSet subs2;
            if (!this.streamSubscriptionsMap.containsKey(stream) || (subs2 = (ConcurrentSkipListSet)this.streamSubscriptionsMap.get(stream)) == null) continue;
            subs2.removeIf(sub -> sub.getSubscriptionId().equals(subscriptionId));
            this.getStreamMetrics(stream).ifPresent(m -> m.getMantisActiveQueryCountGauge().set(subs2.size()));
            if (!subs2.isEmpty()) continue;
            this.streamSubscriptionsMap.remove(stream);
        }
    }

    synchronized boolean removeStreamSubscription(String subscriptionId) {
        LOG.info("removing subscription {}", (Object)subscriptionId);
        List<String> streams = this.subscriptionIdToStreamsMap.getOrDefault(subscriptionId, Collections.emptyList());
        this.removeSubscriptionId(streams, subscriptionId);
        this.subscriptionIdToStreamsMap.remove(subscriptionId);
        return true;
    }

    synchronized boolean removeStreamSubscription(Subscription sub) {
        LOG.info("removing subscription {}", (Object)sub);
        List<String> streams = this.sanitizeStreamSubjects(sub.getSubjects());
        this.removeSubscriptionId(streams, sub.getSubscriptionId());
        this.subscriptionIdToStreamsMap.remove(sub.getSubscriptionId());
        return true;
    }

    Set<Subscription> getStreamSubscriptions(String streamName) {
        return this.streamSubscriptionsMap.getOrDefault(streamName, new ConcurrentSkipListSet());
    }

    Set<String> getAllStreams() {
        return this.streamQueuesMap.keySet();
    }
}

