/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.ipc.http.HttpClient;
import com.netflix.spectator.ipc.http.HttpResponse;
import io.mantisrx.discovery.proto.JobDiscoveryInfo;
import io.mantisrx.discovery.proto.MantisWorker;
import io.mantisrx.discovery.proto.StageWorkers;
import io.mantisrx.publish.AbstractSubscriptionTracker;
import io.mantisrx.publish.DefaultObjectMapper;
import io.mantisrx.publish.StreamManager;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultSubscriptionTracker
extends AbstractSubscriptionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultSubscriptionTracker.class);
    private static final String SUBSCRIPTIONS_URL_FORMAT = "http://%s:%d?jobId=%s";
    private final MrePublishConfiguration mrePublishConfiguration;
    private final String subscriptionsFetchQueryParamString;
    private final Counter fetchSubscriptionsFailedCount;
    private final Counter fetchSubscriptionsNon200Count;
    private final HttpClient httpClient;
    private final MantisJobDiscovery jobDiscovery;
    private final Random random = new Random();

    public DefaultSubscriptionTracker(MrePublishConfiguration mrePublishConfiguration, Registry registry, MantisJobDiscovery jobDiscovery, StreamManager streamManager, HttpClient httpClient) {
        super(mrePublishConfiguration, registry, jobDiscovery, streamManager);
        this.mrePublishConfiguration = mrePublishConfiguration;
        this.jobDiscovery = jobDiscovery;
        this.subscriptionsFetchQueryParamString = mrePublishConfiguration.subscriptionFetchQueryParams();
        this.httpClient = httpClient;
        this.fetchSubscriptionsFailedCount = SpectatorUtils.buildAndRegisterCounter(registry, "fetchSubscriptionsFailedCount");
        this.fetchSubscriptionsNon200Count = SpectatorUtils.buildAndRegisterCounter(registry, "fetchSubscriptionsNon200Count");
    }

    private Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String jobId, MantisWorker worker) {
        try {
            String uri = !this.subscriptionsFetchQueryParamString.isEmpty() ? String.format(SUBSCRIPTIONS_URL_FORMAT, worker.getHost(), worker.getPort(), jobId + "&" + this.subscriptionsFetchQueryParamString) : String.format(SUBSCRIPTIONS_URL_FORMAT, worker.getHost(), worker.getPort(), jobId);
            LOG.trace("Subscription fetch URL: {}", (Object)uri);
            HttpResponse response = this.httpClient.get(URI.create(uri)).withConnectTimeout(1000).withReadTimeout(1000).send();
            if (response.status() == 200) {
                MantisServerSubscriptionEnvelope subscriptionEnvelope = (MantisServerSubscriptionEnvelope)DefaultObjectMapper.getInstance().readValue(response.entityAsString(), MantisServerSubscriptionEnvelope.class);
                LOG.debug("got subs {} from Mantis worker {}", (Object)subscriptionEnvelope, (Object)worker);
                return Optional.ofNullable(subscriptionEnvelope);
            }
            LOG.info("got {} {} response from Mantis worker {}", new Object[]{response.status(), response.entityAsString(), worker});
            this.fetchSubscriptionsNon200Count.increment();
            return Optional.empty();
        }
        catch (Exception e) {
            LOG.info("caught exception fetching subs from {}", (Object)worker, (Object)e);
            this.fetchSubscriptionsFailedCount.increment();
            return Optional.empty();
        }
    }

    private List<MantisWorker> randomSubset(List<MantisWorker> workers, int subsetSize) {
        int size = workers.size();
        if (subsetSize < 0 || subsetSize >= size) {
            return workers;
        }
        for (int idx = 0; idx < subsetSize; ++idx) {
            int randomWorkerIdx = this.random.nextInt(size);
            MantisWorker tmp = workers.get(idx);
            workers.set(idx, workers.get(randomWorkerIdx));
            workers.set(randomWorkerIdx, tmp);
        }
        return workers.subList(0, subsetSize);
    }

    private Optional<MantisServerSubscriptionEnvelope> subsetSubscriptionsResolver(String jobId, List<MantisWorker> workers) {
        HashMap<MantisServerSubscriptionEnvelope, Integer> subCount = new HashMap<MantisServerSubscriptionEnvelope, Integer>();
        int numWorkers = workers.size();
        int maxWorkersToFetchSubsFrom = Math.min(this.mrePublishConfiguration.maxNumWorkersToFetchSubscriptionsFrom(), numWorkers);
        int threshold = maxWorkersToFetchSubsFrom / 2;
        List<MantisWorker> subset = this.randomSubset(workers, maxWorkersToFetchSubsFrom);
        for (MantisWorker mantisWorker : subset) {
            MantisServerSubscriptionEnvelope subscriptions;
            Integer prevCount;
            Optional<MantisServerSubscriptionEnvelope> subscriptionsO = this.fetchSubscriptions(jobId, mantisWorker);
            if (!subscriptionsO.isPresent() || (prevCount = subCount.putIfAbsent(subscriptions = subscriptionsO.get(), 0)) == null) continue;
            subCount.put(subscriptions, prevCount + 1);
            if (prevCount < threshold) continue;
            return Optional.ofNullable(subscriptions);
        }
        return subCount.entrySet().stream().max(Map.Entry.comparingByValue()).map(Map.Entry::getKey);
    }

    @Override
    public Optional<MantisServerSubscriptionEnvelope> fetchSubscriptions(String jobCluster) {
        Optional<JobDiscoveryInfo> jobDiscoveryInfo = this.jobDiscovery.getCurrentJobWorkers(jobCluster);
        if (jobDiscoveryInfo.isPresent()) {
            JobDiscoveryInfo jdi = jobDiscoveryInfo.get();
            StageWorkers workers = jdi.getIngestStageWorkers();
            if (workers != null) {
                return this.subsetSubscriptionsResolver(jdi.getJobId(), workers.getWorkers());
            }
            LOG.info("Subscription refresh failed, workers null for {}", (Object)jobCluster);
        } else {
            LOG.info("Subscription refresh failed, job discovery info not found for {}", (Object)jobCluster);
        }
        return Optional.empty();
    }
}

