/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.mantis.network.push;

import com.netflix.spectator.api.BasicTag;
import com.netflix.spectator.api.Tag;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.mql.jvm.core.Query;
import io.mantisrx.mql.shaded.clojure.java.api.Clojure;
import io.mantisrx.mql.shaded.clojure.lang.IFn;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.netty.channel.WriteBufferWaterMark;
import io.reactivex.mantis.network.push.PushServer;
import io.reactivex.mantis.network.push.PushTrigger;
import io.reactivex.mantis.network.push.ServerConfig;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerRequest;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerResponse;
import mantis.io.reactivex.netty.protocol.http.server.RequestHandler;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import mantis.io.reactivex.netty.server.AbstractServer;
import mantis.io.reactivex.netty.server.RxServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.PublishSubject;
import rx.subjects.SerializedSubject;

public class PushServerSse<T, S>
extends PushServer<T, ServerSentEvent> {
    private static final Logger logger = LoggerFactory.getLogger(PushServerSse.class);
    public static final String PROCESSED_COUNTER_METRIC_NAME = "processedCounter";
    public static final String DROPPED_COUNTER_METRIC_NAME = "droppedCounter";
    private static IFn require = Clojure.var("io.mantisrx.mql.shaded.clojure.core", "require");
    private static IFn mqlMakeQuery;
    private static IFn mqlParses;
    private Func2<Map<String, List<String>>, S, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, S, Void> requestPostprocessor;
    private final Func2<Map<String, List<String>>, S, Void> subscribeProcessor;
    private S processorState;
    private Func1<Map<String, List<String>>, Func1<T, Boolean>> predicate;
    private boolean supportLegacyMetrics;
    private MetricsRegistry metricsRegistry;

    public PushServerSse(PushTrigger<T> trigger, ServerConfig<T> config, PublishSubject<String> serverSignals, Func2<Map<String, List<String>>, S, Void> requestPreprocessor, Func2<Map<String, List<String>>, S, Void> requestPostprocessor, Func2<Map<String, List<String>>, S, Void> subscribeProcessor, S state, boolean supportLegacyMetrics) {
        super(trigger, config, serverSignals);
        this.metricsRegistry = config.getMetricsRegistry();
        this.predicate = config.getPredicate();
        this.processorState = state;
        this.requestPostprocessor = requestPostprocessor;
        this.requestPreprocessor = requestPreprocessor;
        this.subscribeProcessor = subscribeProcessor;
        this.supportLegacyMetrics = supportLegacyMetrics;
    }

    private Metrics registerSseMetrics(String uniqueClientId, String socketAddrStr) {
        BasicTag clientIdTag = new BasicTag("clientId", Optional.ofNullable(uniqueClientId).orElse("none"));
        BasicTag sockAddrTag = new BasicTag("sockAddr", Optional.ofNullable(socketAddrStr).orElse("none"));
        String metricGroup = this.supportLegacyMetrics ? "ServerSentEventRequestHandler" : "PushServerSse";
        Metrics sseSinkMetrics = new Metrics.Builder().id(metricGroup, new Tag[]{clientIdTag, sockAddrTag}).addCounter(PROCESSED_COUNTER_METRIC_NAME).addCounter(DROPPED_COUNTER_METRIC_NAME).build();
        sseSinkMetrics = this.metricsRegistry.registerAndGet(sseSinkMetrics);
        return sseSinkMetrics;
    }

    @Override
    public RxServer<?, ?> createServer() {
        AbstractServer server2 = ((HttpServerBuilder)((HttpServerBuilder)RxNetty.newHttpServerBuilder(this.port, new RequestHandler<String, ServerSentEvent>(){

            @Override
            public Observable<Void> handle(HttpServerRequest<String> request, HttpServerResponse<ServerSentEvent> response) {
                Metrics metrics;
                final Map<String, List<String>> queryParameters = request.getQueryParameters();
                boolean enableHeartbeats = false;
                boolean enableBinaryOutput = false;
                AtomicLong heartBeatReadIdleSec = new AtomicLong(2L);
                SerializedSubject<String, String> metaMsgSubject = PublishSubject.create().toSerialized();
                AtomicLong metaMessagesFreqMSec = new AtomicLong(1000L);
                boolean enableMetaMessages = false;
                AtomicLong lastWriteTime = new AtomicLong();
                Subscription heartbeatSubscription = null;
                Subscription metaMsgSubscription = null;
                boolean enableSampling = false;
                long samplingTimeMsec = 0L;
                String groupId = null;
                String slotId = null;
                String id = null;
                Func1<Object, Boolean> predicateFunction = null;
                if (PushServerSse.this.predicate != null) {
                    predicateFunction = (Func1<Object, Boolean>)PushServerSse.this.predicate.call(queryParameters);
                }
                byte[] delimiter = "$$$".getBytes();
                if (queryParameters != null && !queryParameters.isEmpty()) {
                    String rawDelimiter;
                    String enableMetaMessagesStr;
                    String enablePings;
                    String enableBinaryOutputStr;
                    if (queryParameters.containsKey("id")) {
                        id = queryParameters.get("id").get(0);
                    }
                    if (queryParameters.containsKey("slotId")) {
                        slotId = queryParameters.get("slotId").get(0);
                    }
                    if (queryParameters.containsKey("groupId")) {
                        groupId = queryParameters.get("groupId").get(0);
                    }
                    if (queryParameters.containsKey("clientId")) {
                        groupId = queryParameters.get("clientId").get(0);
                    }
                    if (queryParameters.containsKey("heartbeatSec")) {
                        heartBeatReadIdleSec.set(Long.parseLong(queryParameters.get("heartbeatSec").get(0)));
                        if (heartBeatReadIdleSec.get() < 1L) {
                            throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
                        }
                        enableHeartbeats = true;
                    }
                    if (queryParameters != null && queryParameters.containsKey("mantis.EnableCompressedBinary") && "true".equalsIgnoreCase(enableBinaryOutputStr = queryParameters.get("mantis.EnableCompressedBinary").get(0))) {
                        logger.info("Binary compression requested");
                        enableBinaryOutput = true;
                    }
                    if (queryParameters.containsKey("enablePings") && "true".equalsIgnoreCase(enablePings = queryParameters.get("enablePings").get(0))) {
                        enableHeartbeats = true;
                    }
                    if (queryParameters.containsKey("enableMetaMessages") && "true".equalsIgnoreCase(enableMetaMessagesStr = queryParameters.get("enableMetaMessages").get(0))) {
                        enableMetaMessages = true;
                    }
                    if (queryParameters.containsKey("metaMessagesSec")) {
                        metaMessagesFreqMSec.set(Long.parseLong(queryParameters.get("metaMessagesSec").get(0)));
                        if (metaMessagesFreqMSec.get() < 250L) {
                            throw new IllegalArgumentException("Meta message frequence rate too low: " + metaMessagesFreqMSec.get());
                        }
                        enableMetaMessages = true;
                    }
                    if (queryParameters.containsKey("sample")) {
                        samplingTimeMsec = Long.parseLong(queryParameters.get("sample").get(0)) * 1000L;
                        if (samplingTimeMsec < 50L) {
                            throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
                        }
                        enableSampling = true;
                    }
                    if (queryParameters.containsKey("sampleMSec")) {
                        samplingTimeMsec = Long.parseLong(queryParameters.get("sampleMSec").get(0));
                        if (samplingTimeMsec < 50L) {
                            throw new IllegalArgumentException("Sampling rate too low: " + samplingTimeMsec);
                        }
                        enableSampling = true;
                    }
                    if (queryParameters.containsKey("delimiter") && (rawDelimiter = queryParameters.get("delimiter").get(0)) != null && !rawDelimiter.isEmpty()) {
                        delimiter = rawDelimiter.getBytes();
                    }
                    if (queryParameters.containsKey("mql")) {
                        String query2 = queryParameters.get("mql").get(0);
                        if (((Boolean)mqlParses.invoke(query2)).booleanValue()) {
                            Query q = (Query)mqlMakeQuery.invoke(groupId, query2);
                            predicateFunction = datum -> datum instanceof Map ? q.matches((Map)datum) : true;
                        }
                    }
                }
                InetSocketAddress socketAddress = (InetSocketAddress)response.getChannel().remoteAddress();
                if (groupId == null) {
                    String address = socketAddress.getAddress().toString();
                    metrics = PushServerSse.this.registerSseMetrics(address, address);
                } else {
                    metrics = PushServerSse.this.registerSseMetrics(groupId, socketAddress.getAddress().toString());
                }
                Counter sseProcessedCounter = metrics.getCounter(PushServerSse.PROCESSED_COUNTER_METRIC_NAME);
                Counter sseDroppedCounter = metrics.getCounter(PushServerSse.DROPPED_COUNTER_METRIC_NAME);
                response.getHeaders().set("Access-Control-Allow-Origin", (Object)"*");
                response.getHeaders().set("content-type", (Object)"text/event-stream");
                response.getHeaders().set("Cache-Control", (Object)"no-cache, no-store, max-age=0, must-revalidate");
                response.getHeaders().set("Pragma", (Object)"no-cache");
                response.flush();
                if (queryParameters != null && PushServerSse.this.requestPreprocessor != null) {
                    PushServerSse.this.requestPreprocessor.call(queryParameters, PushServerSse.this.processorState);
                }
                if (enableMetaMessages && metaMessagesFreqMSec.get() > 0L) {
                    logger.info("Enabling Meta messages, interval : " + metaMessagesFreqMSec.get() + " ms");
                    metaMsgSubscription = metaMsgSubject.throttleLast(metaMessagesFreqMSec.get(), TimeUnit.MILLISECONDS).doOnNext(t -> {
                        if (t != null && !t.isEmpty()) {
                            long currentTime = System.currentTimeMillis();
                            ByteBuf data2 = response.getAllocator().buffer().writeBytes(t.getBytes());
                            response.writeAndFlush(new ServerSentEvent(data2));
                            lastWriteTime.set(currentTime);
                        }
                    }).subscribe();
                }
                if (enableHeartbeats && heartBeatReadIdleSec.get() > 0L) {
                    logger.info("Enabling hearts, interval: " + heartBeatReadIdleSec);
                    heartbeatSubscription = Observable.interval(2L, heartBeatReadIdleSec.get(), TimeUnit.SECONDS).doOnNext(t1 -> {
                        long currentTime = System.currentTimeMillis();
                        long diff2 = (currentTime - lastWriteTime.get()) / 1000L;
                        if (diff2 > heartBeatReadIdleSec.get()) {
                            ByteBuf data2 = response.getAllocator().buffer().writeBytes("ping".getBytes());
                            response.writeAndFlush(new ServerSentEvent(data2));
                            lastWriteTime.set(currentTime);
                        }
                    }).subscribe();
                }
                Action0 connectionClosedCallback = null;
                if (queryParameters != null && PushServerSse.this.requestPostprocessor != null) {
                    connectionClosedCallback = new Action0(){

                        @Override
                        public void call() {
                            PushServerSse.this.requestPostprocessor.call(queryParameters, PushServerSse.this.processorState);
                        }
                    };
                }
                class SubscribeCallback
                implements Action0 {
                    SubscribeCallback() {
                    }

                    @Override
                    public void call() {
                        if (queryParameters != null && PushServerSse.this.subscribeProcessor != null) {
                            PushServerSse.this.subscribeProcessor.call(queryParameters, PushServerSse.this.processorState);
                        }
                    }
                }
                return PushServerSse.this.manageConnectionWithCompression(response, socketAddress.getHostString(), socketAddress.getPort(), groupId, slotId, id, lastWriteTime, enableHeartbeats, heartbeatSubscription, enableSampling, samplingTimeMsec, metaMsgSubject, metaMsgSubscription, predicateFunction, connectionClosedCallback, sseProcessedCounter, sseDroppedCounter, new SubscribeCallback(), enableBinaryOutput, true, delimiter);
            }
        }).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator())).channelOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(0x100000, 0x500000))).build();
        return server2;
    }

    static {
        require.invoke(Clojure.read("io.mantisrx.mql.jvm.interfaces.core"));
        require.invoke(Clojure.read("io.mantisrx.mql.jvm.interfaces.server"));
        mqlMakeQuery = Clojure.var("io.mantisrx.mql.jvm.interfaces.server", "make-query");
        mqlParses = Clojure.var("io.mantisrx.mql.jvm.interfaces.core", "parses?");
    }
}

