/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.sink;

import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.PortRequest;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.ServerSentEventRequestHandler;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.mantisrx.server.core.ServiceRegistry;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelOption;
import io.reactivex.mantis.network.push.PushServerSse;
import io.reactivex.mantis.network.push.PushServers;
import io.reactivex.mantis.network.push.Routers;
import io.reactivex.mantis.network.push.ServerConfig;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.server.HttpServer;
import mantis.io.reactivex.netty.protocol.http.server.HttpServerBuilder;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.subjects.BehaviorSubject;

public class ServerSentEventsSink<T>
implements SelfDocumentingSink<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ServerSentEventsSink.class);
    private final Func2<Map<String, List<String>>, Context, Void> subscribeProcessor;
    private final BehaviorSubject<Integer> portObservable = BehaviorSubject.create();
    private final Func1<T, String> encoder;
    private final Func1<Throwable, String> errorEncoder;
    private final Predicate<T> predicate;
    private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
    private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
    private int port = -1;
    private final MantisPropertiesLoader propService;
    private PushServerSse<T, Context> pushServerSse;
    private HttpServer<ByteBuf, ServerSentEvent> httpServer;

    public ServerSentEventsSink(Func1<T, String> encoder) {
        this(encoder, null, null);
    }

    ServerSentEventsSink(Func1<T, String> encoder, Func1<Throwable, String> errorEncoder, Predicate<T> predicate) {
        if (errorEncoder == null) {
            errorEncoder = Throwable::getMessage;
        }
        this.encoder = encoder;
        this.errorEncoder = errorEncoder;
        this.predicate = predicate;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
        this.subscribeProcessor = null;
    }

    ServerSentEventsSink(Builder<T> builder) {
        this.encoder = ((Builder)builder).encoder;
        this.errorEncoder = ((Builder)builder).errorEncoder;
        this.predicate = ((Builder)builder).predicate;
        this.requestPreprocessor = ((Builder)builder).requestPreprocessor;
        this.requestPostprocessor = ((Builder)builder).requestPostprocessor;
        this.subscribeProcessor = ((Builder)builder).subscribeProcessor;
        this.propService = ServiceRegistry.INSTANCE.getPropertiesService();
    }

    @Override
    public Metadata metadata() {
        StringBuilder description = new StringBuilder();
        description.append("HTTP server streaming results using Server-sent events.  The sink supports optional subscription (GET) parameters to change the events emitted by the stream.  A sampling interval can be applied to the stream using the GET parameter sample=numSeconds.  This will limit the stream rate to events-per-numSeconds.");
        if (this.predicate != null && this.predicate.getDescription() != null) {
            description.append("  Predicate description: ").append(this.predicate.getDescription());
        }
        return new Metadata.Builder().name("Server Sent Event Sink").description(description.toString()).build();
    }

    private boolean runNewSseServerImpl(String jobName) {
        String legacyServerString = this.propService.getStringValue("mantis.sse.newServerImpl", "true");
        String legacyServerStringPerJob = this.propService.getStringValue(jobName + ".mantis.sse.newServerImpl", "false");
        return Boolean.parseBoolean(legacyServerString) || Boolean.parseBoolean(legacyServerStringPerJob);
    }

    private int numConsumerThreads() {
        String consumerThreadsString = this.propService.getStringValue("mantis.sse.numConsumerThreads", "1");
        return Integer.parseInt(consumerThreadsString);
    }

    private int maxChunkSize() {
        String maxChunkSize = this.propService.getStringValue("mantis.sse.maxChunkSize", "1000");
        return Integer.parseInt(maxChunkSize);
    }

    private int maxReadTime() {
        String maxChunkSize = this.propService.getStringValue("mantis.sse.maxReadTimeMSec", "250");
        return Integer.parseInt(maxChunkSize);
    }

    private int bufferCapacity() {
        String bufferCapacityString = this.propService.getStringValue("mantis.sse.bufferCapacity", "25000");
        return Integer.parseInt(bufferCapacityString);
    }

    private boolean useSpsc() {
        String useSpsc = this.propService.getStringValue("mantis.sse.spsc", "false");
        return Boolean.parseBoolean(useSpsc);
    }

    public void call(Context context, PortRequest portRequest, Observable<T> observable) {
        this.port = portRequest.getPort();
        if (this.runNewSseServerImpl(context.getWorkerInfo().getJobName())) {
            LOG.info("Serving modern HTTP SSE server sink on port: " + this.port);
            String serverName = "SseSink";
            ServerConfig.Builder config = new ServerConfig.Builder().name(serverName).groupRouter(Routers.roundRobinSse((String)serverName, this.encoder)).port(this.port).metricsRegistry(context.getMetricsRegistry()).maxChunkTimeMSec(this.maxReadTime()).maxChunkSize(this.maxChunkSize()).bufferCapacity(this.bufferCapacity()).numQueueConsumers(this.numConsumerThreads()).useSpscQueue(this.useSpsc()).maxChunkTimeMSec(this.getBatchInterval());
            if (this.predicate != null) {
                config.predicate(this.predicate.getPredicate());
            }
            this.pushServerSse = PushServers.infiniteStreamSse((ServerConfig)config.build(), observable, this.requestPreprocessor, this.requestPostprocessor, this.subscribeProcessor, (Object)context, (boolean)true);
            this.pushServerSse.start();
        } else {
            LOG.info("Serving legacy HTTP SSE server sink on port: " + this.port);
            int batchInterval = this.getBatchInterval();
            this.httpServer = ((HttpServerBuilder)((HttpServerBuilder)((HttpServerBuilder)RxNetty.newHttpServerBuilder((int)this.port, new ServerSentEventRequestHandler<T>(observable, this.encoder, this.errorEncoder, this.predicate, this.requestPreprocessor, this.requestPostprocessor, context, batchInterval)).pipelineConfigurator(PipelineConfigurators.serveSseConfigurator())).channelOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, (Object)0x500000)).channelOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, (Object)0x100000)).build();
            this.httpServer.start();
        }
        this.portObservable.onNext((Object)this.port);
    }

    @Override
    public void close() throws IOException {
        if (this.pushServerSse != null) {
            this.pushServerSse.shutdown();
        } else if (this.httpServer != null) {
            try {
                this.httpServer.shutdown();
            }
            catch (InterruptedException e) {
                throw new IOException(String.format("Failed to shut down the http server %s", this.httpServer), e);
            }
        }
    }

    private int getBatchInterval() {
        String flushIntervalMillisStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.sse.batchInterval", "100");
        LOG.info("Read fast property mantis.sse.batchInterval" + flushIntervalMillisStr);
        return Integer.parseInt(flushIntervalMillisStr);
    }

    private int getHighWaterMark() {
        String jobName = this.propService.getStringValue("JOB_NAME", "default");
        int highWaterMark = 0x500000;
        String highWaterMarkStr = this.propService.getStringValue(jobName + ".sse.highwater.mark", Integer.toString(0x500000));
        LOG.info("Read fast property:" + jobName + ".sse.highwater.mark ->" + highWaterMarkStr);
        try {
            highWaterMark = Integer.parseInt(highWaterMarkStr);
        }
        catch (Exception e) {
            LOG.error("Error parsing string " + highWaterMarkStr + " exception " + e.getMessage());
        }
        return highWaterMark;
    }

    public int getServerPort() {
        return this.port;
    }

    public Observable<Integer> portConnections() {
        return this.portObservable;
    }

    public static class Builder<T> {
        private Func1<T, String> encoder;
        private Func2<Map<String, List<String>>, Context, Void> requestPreprocessor;
        private Func2<Map<String, List<String>>, Context, Void> requestPostprocessor;
        private Func1<Throwable, String> errorEncoder = Throwable::getMessage;
        private Predicate<T> predicate;
        private Func2<Map<String, List<String>>, Context, Void> subscribeProcessor;

        public Builder<T> withEncoder(Func1<T, String> encoder) {
            this.encoder = encoder;
            return this;
        }

        public Builder<T> withErrorEncoder(Func1<Throwable, String> errorEncoder) {
            this.errorEncoder = errorEncoder;
            return this;
        }

        public Builder<T> withPredicate(Predicate<T> predicate) {
            this.predicate = predicate;
            return this;
        }

        public Builder<T> withRequestPreprocessor(Func2<Map<String, List<String>>, Context, Void> preProcessor) {
            this.requestPreprocessor = preProcessor;
            return this;
        }

        public Builder<T> withSubscribePreprocessor(Func2<Map<String, List<String>>, Context, Void> subscribeProcessor) {
            this.subscribeProcessor = subscribeProcessor;
            return this;
        }

        public Builder<T> withRequestPostprocessor(Func2<Map<String, List<String>>, Context, Void> postProcessor) {
            this.requestPostprocessor = postProcessor;
            return this;
        }

        public ServerSentEventsSink<T> build() {
            return new ServerSentEventsSink(this);
        }
    }
}

