/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.api.push;

import com.netflix.config.DynamicIntProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.ConnectionBroker;
import io.mantisrx.api.push.PushConnectionDetails;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.master.client.HighAvailabilityServices;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import mantis.io.reactivex.netty.RxNetty;
import mantis.io.reactivex.netty.channel.ContentTransformer;
import mantis.io.reactivex.netty.channel.StringTransformer;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurator;
import mantis.io.reactivex.netty.pipeline.PipelineConfigurators;
import mantis.io.reactivex.netty.protocol.http.client.HttpClient;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientBuilder;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpResponseHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

public class MantisSSEHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger log = LoggerFactory.getLogger(MantisSSEHandler.class);
    private final DynamicIntProperty queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
    private final DynamicIntProperty writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
    private final ConnectionBroker connectionBroker;
    private final HighAvailabilityServices highAvailabilityServices;
    private final List<String> pushPrefixes;
    private Subscription subscription;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("sse-handler-drainer-%d").build());
    private ScheduledFuture drainFuture;
    private String uri;

    public MantisSSEHandler(ConnectionBroker connectionBroker, HighAvailabilityServices highAvailabilityServices, List<String> pushPrefixes) {
        super(true);
        this.connectionBroker = connectionBroker;
        this.highAvailabilityServices = highAvailabilityServices;
        this.pushPrefixes = pushPrefixes;
    }

    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (Util.startsWithAnyOf(request.uri(), this.pushPrefixes) && !this.isWebsocketUpgrade((HttpRequest)request)) {
            if (HttpUtil.is100ContinueExpected((HttpMessage)request)) {
                MantisSSEHandler.send100Contine(ctx);
            }
            DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            HttpHeaders headers = response.headers();
            headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
            headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, (Object)"Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
            headers.set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)"text/event-stream");
            headers.set((CharSequence)HttpHeaderNames.CACHE_CONTROL, (Object)"no-cache, no-store, max-age=0, must-revalidate");
            headers.set((CharSequence)HttpHeaderNames.PRAGMA, (Object)HttpHeaderValues.NO_CACHE);
            headers.set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
            response.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)HttpHeaderValues.KEEP_ALIVE);
            ctx.writeAndFlush((Object)response);
            this.uri = request.uri();
            PushConnectionDetails pcd = this.isSubmitAndConnect((HttpRequest)request) ? new PushConnectionDetails(this.uri, this.jobSubmit(request), PushConnectionDetails.TARGET_TYPE.CONNECT_BY_ID, (io.vavr.collection.List<String>)io.vavr.collection.List.empty()) : PushConnectionDetails.from(this.uri);
            log.info("SSE Connecting for: {}", (Object)pcd);
            boolean tunnelPingsEnabled = this.isTunnelPingsEnabled(this.uri);
            String[] tags = Util.getTaglist(this.uri, pcd.target);
            Counter numDroppedBytesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkBytes", (String)pcd.target, (String[])tags);
            Counter numDroppedMessagesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkMessages", (String)pcd.target, (String[])tags);
            Counter numMessagesCounter = SpectatorUtils.newCounter((String)"numSinkMessages", (String)pcd.target, (String[])tags);
            Counter numBytesCounter = SpectatorUtils.newCounter((String)"numSinkBytes", (String)pcd.target, (String[])tags);
            Counter drainTriggeredCounter = SpectatorUtils.newCounter((String)"drainTriggered", (String)pcd.target, (String[])tags);
            Counter numIncomingMessagesCounter = SpectatorUtils.newCounter((String)"numIncomingMessages", (String)pcd.target, (String[])tags);
            LinkedBlockingQueue queue = new LinkedBlockingQueue(this.queueCapacity.get());
            this.drainFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                block6: {
                    try {
                        if (queue.size() <= 0 || !ctx.channel().isWritable()) break block6;
                        drainTriggeredCounter.increment();
                        ArrayList items = new ArrayList(queue.size());
                        BlockingQueue blockingQueue = queue;
                        synchronized (blockingQueue) {
                            queue.drainTo(items);
                        }
                        for (String data : items) {
                            ctx.write((Object)Unpooled.copiedBuffer((CharSequence)data, (Charset)StandardCharsets.UTF_8));
                            numMessagesCounter.increment();
                            numBytesCounter.increment((long)data.length());
                        }
                        ctx.flush();
                    }
                    catch (Exception ex) {
                        log.error("Error writing to channel", (Throwable)ex);
                    }
                }
            }, this.writeIntervalMillis.get(), this.writeIntervalMillis.get(), TimeUnit.MILLISECONDS);
            this.subscription = this.connectionBroker.connect(pcd).doOnNext(event -> numIncomingMessagesCounter.increment()).mergeWith(tunnelPingsEnabled ? Observable.interval((long)12L, (long)12L, (TimeUnit)TimeUnit.SECONDS).map(l -> "MantisApiTunnelPing") : Observable.empty()).doOnNext(event -> {
                if (!"DUMMY_TIMER_DATA".equals(event)) {
                    String data = "data: " + event + "\r\n\r\n";
                    boolean offer = false;
                    BlockingQueue blockingQueue = queue;
                    synchronized (blockingQueue) {
                        offer = queue.offer(data);
                    }
                    if (!offer) {
                        numDroppedBytesCounter.increment((long)data.length());
                        numDroppedMessagesCounter.increment();
                    }
                }
            }).subscribe();
        } else {
            ctx.fireChannelRead((Object)request.retain());
        }
    }

    private static void send100Contine(ChannelHandlerContext ctx) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush((Object)response);
    }

    private boolean isTunnelPingsEnabled(String uri) {
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        return queryStringDecoder.parameters().getOrDefault("MantisApiTunnelPingEnabled", Arrays.asList("false")).get(0).equalsIgnoreCase("true");
    }

    private boolean isWebsocketUpgrade(HttpRequest request) {
        HttpHeaders headers = request.headers();
        String connection = headers.get((CharSequence)HttpHeaderNames.CONNECTION);
        String upgrade = headers.get((CharSequence)HttpHeaderNames.UPGRADE);
        return connection != null && connection.toLowerCase().contains("upgrade") && upgrade != null && upgrade.toLowerCase().equals("websocket");
    }

    private boolean isSubmitAndConnect(HttpRequest request) {
        return request.method().equals((Object)HttpMethod.POST) && request.uri().contains("jobsubmitandconnect");
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is unregistered. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelUnregistered(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is inactive. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Exception caught by channel {}. URI: {}", new Object[]{ctx.channel(), this.uri, cause});
        this.unsubscribeIfSubscribed();
        ctx.close();
    }

    private void unsubscribeIfSubscribed() {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            log.info("SSE unsubscribing subscription with URI: {}", (Object)this.uri);
            this.subscription.unsubscribe();
        }
        if (this.drainFuture != null) {
            this.drainFuture.cancel(false);
        }
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
    }

    public String jobSubmit(FullHttpRequest request) {
        String API_JOB_SUBMIT_PATH = "/api/submit";
        String content = request.content().toString(StandardCharsets.UTF_8);
        return (String)MantisSSEHandler.callPostOnMaster((Observable<MasterDescription>)this.highAvailabilityServices.getMasterMonitor().getMasterObservable(), "/api/submit", content).retryWhen(Util.getRetryFunc(log, "/api/submit")).flatMap(masterResponse -> masterResponse.getByteBuf().take(1).map(byteBuf -> {
            String s = byteBuf.toString(StandardCharsets.UTF_8);
            log.info("response: " + s);
            return s;
        })).take(1).toBlocking().first();
    }

    public static Observable<MasterResponse> callPostOnMaster(Observable<MasterDescription> masterObservable, String uri, String content) {
        PipelineConfigurator pipelineConfigurator = PipelineConfigurators.httpClientConfigurator();
        return masterObservable.filter(Objects::nonNull).flatMap(masterDesc -> {
            HttpClient client = (HttpClient)((HttpClientBuilder)RxNetty.newHttpClientBuilder((String)masterDesc.getHostname(), (int)masterDesc.getApiPort()).pipelineConfigurator(pipelineConfigurator)).build();
            HttpClientRequest request = HttpClientRequest.create((HttpMethod)HttpMethod.POST, (String)uri);
            request = request.withHeader(HttpHeaderNames.CONTENT_TYPE.toString(), HttpHeaderValues.APPLICATION_JSON.toString());
            request.withRawContent((Object)content, (ContentTransformer)StringTransformer.DEFAULT_INSTANCE);
            return client.submit(request).map(response -> new MasterResponse(response.getStatus(), (Observable<ByteBuf>)response.getContent(), response.getHeaders()));
        }).take(1);
    }

    public static class MasterResponse {
        private final HttpResponseStatus status;
        private final Observable<ByteBuf> byteBuf;
        private final HttpResponseHeaders responseHeaders;

        public MasterResponse(HttpResponseStatus status, Observable<ByteBuf> byteBuf, HttpResponseHeaders responseHeaders) {
            this.status = status;
            this.byteBuf = byteBuf;
            this.responseHeaders = responseHeaders;
        }

        public HttpResponseStatus getStatus() {
            return this.status;
        }

        public Observable<ByteBuf> getByteBuf() {
            return this.byteBuf;
        }

        public HttpResponseHeaders getResponseHeaders() {
            return this.responseHeaders;
        }
    }
}

