/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.common.metrics;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Gauge;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.measurement.CounterMeasurement;
import io.mantisrx.common.metrics.measurement.GaugeMeasurement;
import io.mantisrx.common.metrics.measurement.Measurements;
import io.mantisrx.common.metrics.spectator.MetricId;
import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServer;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;

public class MetricsServer {
    private static final Logger logger = LoggerFactory.getLogger(MetricsServer.class);
    private final ObjectMapper mapper = new ObjectMapper();
    private HttpServer<ByteBuf, ServerSentEvent> server;
    private int port;
    private Map<String, String> tags;
    private long publishRateInSeconds;

    public MetricsServer(int port, long publishRateInSeconds, Map<String, String> tags) {
        this.port = port;
        this.publishRateInSeconds = publishRateInSeconds;
        this.tags = tags;
        this.mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        this.mapper.registerModule(new Jdk8Module());
    }

    private Observable<Measurements> measurements(long timeFrequency) {
        final MetricsRegistry registry = MetricsRegistry.getInstance();
        return Observable.interval(0L, timeFrequency, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<Measurements>>(){

            @Override
            public Observable<Measurements> call(Long t1) {
                long timestamp = System.currentTimeMillis();
                ArrayList<Measurements> measurements = new ArrayList<Measurements>();
                for (Metrics metrics : registry.metrics()) {
                    LinkedList<CounterMeasurement> counters = new LinkedList<CounterMeasurement>();
                    LinkedList<GaugeMeasurement> gauges = new LinkedList<GaugeMeasurement>();
                    for (Map.Entry<MetricId, Counter> entry : metrics.counters().entrySet()) {
                        Counter counter = entry.getValue();
                        counters.add(new CounterMeasurement(entry.getKey().metricName(), counter.value()));
                    }
                    for (Map.Entry<MetricId, Object> entry : metrics.gauges().entrySet()) {
                        gauges.add(new GaugeMeasurement(entry.getKey().metricName(), ((Gauge)entry.getValue()).doubleValue()));
                    }
                    measurements.add(new Measurements(metrics.getMetricGroupId().id(), timestamp, counters, gauges, MetricsServer.this.tags));
                }
                return Observable.from(measurements);
            }
        });
    }

    public void start() {
        final Observable<Measurements> measurements = this.measurements(this.publishRateInSeconds);
        logger.info("Starting metrics server on port: " + this.port);
        this.server = RxNetty.createHttpServer(this.port, new RequestHandler<ByteBuf, ServerSentEvent>(){

            @Override
            public Observable<Void> handle(HttpServerRequest<ByteBuf> request, final HttpServerResponse<ServerSentEvent> response) {
                Map<String, List<String>> queryParameters = request.getQueryParameters();
                final LinkedList namesToFilter = new LinkedList();
                logger.info("got query params {}", (Object)queryParameters);
                if (queryParameters != null && queryParameters.containsKey("name")) {
                    namesToFilter.addAll(queryParameters.get("name"));
                }
                Observable<Measurements> filteredObservable = measurements.filter(new Func1<Measurements, Boolean>(){

                    @Override
                    public Boolean call(Measurements measurements) {
                        if (!namesToFilter.isEmpty()) {
                            for (String name2 : namesToFilter) {
                                if (name2.indexOf(42) != -1) {
                                    if (name2.indexOf(42) == 0 && measurements.getName().endsWith(name2.substring(1))) {
                                        return true;
                                    }
                                    if (name2.indexOf(42) > 0 && measurements.getName().startsWith(name2.substring(0, name2.indexOf(42)))) {
                                        return true;
                                    }
                                }
                                if (!measurements.getName().equals(name2)) continue;
                                return true;
                            }
                            return false;
                        }
                        return true;
                    }
                });
                return filteredObservable.flatMap(new Func1<Measurements, Observable<Void>>(){

                    @Override
                    public Observable<Void> call(Measurements metrics) {
                        response.getHeaders().set("Access-Control-Allow-Origin", (Object)"*");
                        response.getHeaders().set("content-type", (Object)"text/event-stream");
                        ServerSentEvent event = null;
                        try {
                            ByteBuf data2 = response.getAllocator().buffer().writeBytes(MetricsServer.this.mapper.writeValueAsString(metrics).getBytes());
                            event = new ServerSentEvent(data2);
                        }
                        catch (JsonProcessingException e2) {
                            logger.error("Failed to map metrics to JSON", e2);
                        }
                        if (event != null) {
                            response.write(event);
                            return response.writeStringAndFlush("\n");
                        }
                        return null;
                    }
                });
            }
        }, PipelineConfigurators.serveSseConfigurator()).start();
    }

    public void shutdown() {
        if (this.server != null) {
            logger.info("Shutting down metrics server on port");
            logger.info("Waiting (2 x push-period) to flush buffers, before shut down.");
            try {
                TimeUnit.SECONDS.sleep(2L);
                this.server.shutdown();
            }
            catch (InterruptedException e2) {
                logger.warn("Failed to shutdown metrics server", e2);
            }
        }
    }
}

