/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.api.tunnel;

import com.netflix.config.DynamicIntProperty;
import com.netflix.config.DynamicStringProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.ConnectionBroker;
import io.mantisrx.api.push.PushConnectionDetails;
import io.mantisrx.api.tunnel.MantisCrossRegionalClient;
import io.mantisrx.api.tunnel.RegionData;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import mantis.io.reactivex.netty.channel.ContentTransformer;
import mantis.io.reactivex.netty.channel.StringTransformer;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;

public class CrossRegionHandler
extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger log = LoggerFactory.getLogger(CrossRegionHandler.class);
    private final List<String> pushPrefixes;
    private final MantisCrossRegionalClient mantisCrossRegionalClient;
    private final ConnectionBroker connectionBroker;
    private final Scheduler scheduler;
    private Subscription subscription = null;
    private final DynamicIntProperty queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
    private final DynamicIntProperty writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
    private final DynamicStringProperty tunnelRegionsProperty = new DynamicStringProperty("io.mantisrx.api.tunnel.regions", Util.getLocalRegion());
    private static final String regKey = "mantis.meta.origin";
    private static final String errKey = "mantis.meta.errorString";
    private static final String codeKey = "mantis.meta.origin.response.code";

    private List<String> getTunnelRegions() {
        return Arrays.asList(this.tunnelRegionsProperty.get().split(",")).stream().map(String::trim).map(String::toLowerCase).collect(Collectors.toList());
    }

    public CrossRegionHandler(List<String> pushPrefixes, MantisCrossRegionalClient mantisCrossRegionalClient, ConnectionBroker connectionBroker, Scheduler scheduler) {
        super(true);
        this.pushPrefixes = pushPrefixes;
        this.mantisCrossRegionalClient = mantisCrossRegionalClient;
        this.connectionBroker = connectionBroker;
        this.scheduler = scheduler;
    }

    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
        if (HttpUtil.is100ContinueExpected((HttpMessage)request)) {
            CrossRegionHandler.send100Contine(ctx);
        }
        if (this.isCrossRegionStreamingPath(request.uri())) {
            this.handleRemoteSse(ctx, request);
        } else if (request.method() == HttpMethod.HEAD) {
            this.handleHead(ctx, request);
        } else if (request.method() == HttpMethod.GET) {
            this.handleRestGet(ctx, request);
        } else if (request.method() == HttpMethod.POST) {
            this.handleRestPost(ctx, request);
        } else {
            ctx.fireChannelRead((Object)request.retain());
        }
    }

    private void handleHead(ChannelHandlerContext ctx, FullHttpRequest request) {
        DefaultHttpHeaders headers = new DefaultHttpHeaders();
        headers.add((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)HttpHeaderValues.APPLICATION_JSON);
        headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
        headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, (Object)"Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
        headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, (Object)"GET, OPTIONS, PUT, POST, DELETE, CONNECT");
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer((CharSequence)"", (Charset)Charset.defaultCharset()), (HttpHeaders)headers, (HttpHeaders)new DefaultHttpHeaders());
        ctx.writeAndFlush((Object)response).addListener(__ -> ctx.close());
    }

    private void handleRestGet(ChannelHandlerContext ctx, FullHttpRequest request) {
        List<String> regions = Util.isAllRegion(CrossRegionHandler.getRegion(request.uri())) ? this.getTunnelRegions() : Collections.singletonList(CrossRegionHandler.getRegion(request.uri()));
        String uri = CrossRegionHandler.getTail(request.uri());
        Observable.from(regions).flatMap(region -> {
            AtomicReference ref = new AtomicReference();
            HttpClientRequest rq = HttpClientRequest.create((HttpMethod)HttpMethod.GET, (String)uri);
            return Observable.create(subscriber -> subscriber.onNext(this.mantisCrossRegionalClient.getSecureRestClient((String)region))).flatMap(client -> {
                ref.set(null);
                return client.submit(rq).flatMap(resp -> {
                    int code = resp.getStatus().code();
                    if (code >= 500) {
                        throw new RuntimeException(resp.getStatus().toString());
                    }
                    return this.responseToRegionData((String)region, (HttpClientResponse<ByteBuf>)resp);
                }).onErrorReturn(t -> {
                    log.warn("Error getting response from remote master: " + t.getMessage());
                    ref.set(t);
                    return new RegionData((String)region, false, t.getMessage(), 0);
                });
            }).map(data -> {
                Throwable t = (Throwable)ref.get();
                if (t != null) {
                    throw new RuntimeException(t);
                }
                return data;
            }).retryWhen(Util.getRetryFunc(log, uri + " in " + region)).take(1).onErrorReturn(t -> new RegionData((String)region, false, t.getMessage(), 0));
        }).reduce(new ArrayList(3), (regionDatas, regionData) -> {
            regionDatas.add(regionData);
            return regionDatas;
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).take(1).subscribe(result -> this.writeDataAndCloseChannel(ctx, (ArrayList<RegionData>)result));
    }

    private void handleRestPost(ChannelHandlerContext ctx, FullHttpRequest request) {
        String uri = CrossRegionHandler.getTail(request.uri());
        List<String> regions = Util.isAllRegion(CrossRegionHandler.getRegion(request.uri())) ? this.getTunnelRegions() : Collections.singletonList(CrossRegionHandler.getRegion(request.uri()));
        log.info("Relaying POST URI {} to {}.", (Object)uri, regions);
        AtomicReference ref = new AtomicReference();
        String content = request.content().toString(Charset.defaultCharset());
        Observable.from(regions).flatMap(region -> {
            HttpClientRequest rq = HttpClientRequest.create((HttpMethod)HttpMethod.POST, (String)uri);
            rq.withRawContent((Object)content, (ContentTransformer)StringTransformer.DEFAULT_INSTANCE);
            return Observable.create(subscriber -> subscriber.onNext(this.mantisCrossRegionalClient.getSecureRestClient((String)region))).flatMap(client -> client.submit(rq).flatMap(resp -> {
                int code = resp.getStatus().code();
                if (code >= 500) {
                    throw new RuntimeException(resp.getStatus().toString() + "in " + region);
                }
                return this.responseToRegionData((String)region, (HttpClientResponse<ByteBuf>)resp);
            }).onErrorReturn(t -> {
                log.warn("Error getting response from remote master: " + t.getMessage());
                ref.set(t);
                return new RegionData((String)region, false, t.getMessage(), 0);
            })).map(data -> {
                Throwable t = (Throwable)ref.get();
                if (t != null) {
                    throw new RuntimeException(t);
                }
                return data;
            }).retryWhen(Util.getRetryFunc(log, uri + " in " + region)).take(1).onErrorReturn(t -> new RegionData((String)region, false, t.getMessage(), 0));
        }).reduce(new ArrayList(), (regionDatas, regionData) -> {
            regionDatas.add(regionData);
            return regionDatas;
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).take(1).subscribe(result -> this.writeDataAndCloseChannel(ctx, (ArrayList<RegionData>)result));
    }

    private void handleRemoteSse(ChannelHandlerContext ctx, FullHttpRequest request) {
        DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        HttpHeaders headers = response.headers();
        headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
        headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, (Object)"Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
        headers.set((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)"text/event-stream");
        headers.set((CharSequence)HttpHeaderNames.CACHE_CONTROL, (Object)"no-cache, no-store, max-age=0, must-revalidate");
        headers.set((CharSequence)HttpHeaderNames.PRAGMA, (Object)HttpHeaderValues.NO_CACHE);
        headers.set((CharSequence)HttpHeaderNames.TRANSFER_ENCODING, (Object)HttpHeaderValues.CHUNKED);
        response.headers().set((CharSequence)HttpHeaderNames.CONNECTION, (Object)HttpHeaderValues.KEEP_ALIVE);
        ctx.writeAndFlush((Object)response);
        boolean sendThroughTunnelPings = this.hasTunnelPingParam(request.uri());
        String uri = this.uriWithTunnelParamsAdded(CrossRegionHandler.getTail(request.uri()));
        List<String> regions = Util.isAllRegion(CrossRegionHandler.getRegion(request.uri())) ? this.getTunnelRegions() : Collections.singletonList(CrossRegionHandler.getRegion(request.uri()));
        log.info("Initiating remote SSE connection to {} in {}.", (Object)uri, regions);
        PushConnectionDetails pcd = PushConnectionDetails.from(uri, regions);
        String[] tags = Util.getTaglist(request.uri(), pcd.target, CrossRegionHandler.getRegion(request.uri()));
        Counter numDroppedBytesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkBytes", (String)pcd.target, (String[])tags);
        Counter numDroppedMessagesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkMessages", (String)pcd.target, (String[])tags);
        Counter numMessagesCounter = SpectatorUtils.newCounter((String)"numSinkMessages", (String)pcd.target, (String[])tags);
        Counter numBytesCounter = SpectatorUtils.newCounter((String)"numSinkBytes", (String)pcd.target, (String[])tags);
        LinkedBlockingQueue queue = new LinkedBlockingQueue(this.queueCapacity.get());
        this.subscription = this.connectionBroker.connect(pcd).filter(event -> !event.equalsIgnoreCase("MantisApiTunnelPing") || sendThroughTunnelPings).mergeWith(Observable.interval((long)this.writeIntervalMillis.get(), (TimeUnit)TimeUnit.MILLISECONDS).map(__ -> "DUMMY_TIMER_DATA")).doOnNext(event -> {
            String data;
            if (!"DUMMY_TIMER_DATA".equals(event) && !queue.offer(data = "data: " + event + "\r\n\r\n")) {
                numDroppedBytesCounter.increment((long)data.length());
                numDroppedMessagesCounter.increment();
            }
        }).filter("DUMMY_TIMER_DATA"::equals).doOnNext(__ -> {
            if (ctx.channel().isWritable()) {
                ArrayList items = new ArrayList(queue.size());
                queue.drainTo(items);
                for (String data : items) {
                    ctx.writeAndFlush((Object)Unpooled.copiedBuffer((CharSequence)data, (Charset)StandardCharsets.UTF_8));
                    numMessagesCounter.increment();
                    numBytesCounter.increment((long)data.length());
                }
            }
        }).subscribe();
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            this.subscription.unsubscribe();
        }
    }

    private boolean hasTunnelPingParam(String uri) {
        return uri != null && uri.contains("MantisApiTunnelPingEnabled");
    }

    private Observable<RegionData> responseToRegionData(String region, HttpClientResponse<ByteBuf> resp) {
        int code = resp.getStatus().code();
        return resp.getContent().collect(Unpooled::buffer, ByteBuf::writeBytes).map(byteBuf -> new RegionData(region, true, byteBuf.toString(StandardCharsets.UTF_8), code)).onErrorReturn(t -> new RegionData(region, false, t.getMessage(), code));
    }

    private void writeDataAndCloseChannel(ChannelHandlerContext ctx, ArrayList<RegionData> result) {
        try {
            String serialized = CrossRegionHandler.responseToString(result);
            DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.copiedBuffer((CharSequence)serialized, (Charset)Charset.defaultCharset()));
            HttpHeaders headers = response.headers();
            headers.add((CharSequence)HttpHeaderNames.CONTENT_TYPE, (Object)(HttpHeaderValues.APPLICATION_JSON + "; charset=utf-8"));
            headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, (Object)"*");
            headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, (Object)"Origin, X-Requested-With, Accept, Content-Type, Cache-Control");
            headers.add((CharSequence)HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, (Object)"GET, OPTIONS, PUT, POST, DELETE, CONNECT");
            headers.add((CharSequence)HttpHeaderNames.CONTENT_LENGTH, (Object)serialized.length());
            ctx.writeAndFlush((Object)response).addListener(__ -> ctx.close());
        }
        catch (Exception ex) {
            log.error("Error serializing cross regional response: {}", (Object)ex.getMessage(), (Object)ex);
        }
    }

    private String uriWithTunnelParamsAdded(String uri) {
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
        queryStringDecoder.parameters().forEach((key, value) -> value.forEach(val -> queryStringEncoder.addParam(key, val)));
        queryStringEncoder.addParam("MantisApiTunnelPingEnabled", "true");
        queryStringEncoder.addParam("MantisApiTag", "originRegion:" + Util.getLocalRegion());
        return queryStringEncoder.toString();
    }

    private static void send100Contine(ChannelHandlerContext ctx) {
        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.writeAndFlush((Object)response);
    }

    private boolean isCrossRegionStreamingPath(String uri) {
        return Util.startsWithAnyOf(CrossRegionHandler.getTail(uri), this.pushPrefixes);
    }

    private static String getTail(String uri) {
        return uri.replaceFirst("^/region/.*?/", "/");
    }

    private static String getRegion(String uri) {
        return uri.replaceFirst("^/region/", "").replaceFirst("/.*$", "").trim().toLowerCase();
    }

    private static String responseToString(List<RegionData> dataList) {
        StringBuilder sb = new StringBuilder("[");
        boolean first = true;
        for (RegionData data : dataList) {
            if (first) {
                first = false;
            } else {
                sb.append(",");
            }
            if (data.isSuccess()) {
                String outputData = CrossRegionHandler.getForceWrappedJson(data.getData(), data.getRegion(), data.getResponseCode(), null);
                sb.append(outputData);
                continue;
            }
            sb.append(CrossRegionHandler.getForceWrappedJson("", data.getRegion(), data.getResponseCode(), data.getData()));
        }
        sb.append("]");
        return sb.toString();
    }

    public static String getWrappedJson(String data, String region, String err) {
        return CrossRegionHandler.getWrappedJsonIntl(data, region, err, 0, false);
    }

    public static String getForceWrappedJson(String data, String region, int code, String err) {
        return CrossRegionHandler.getWrappedJsonIntl(data, region, err, code, true);
    }

    private static String getWrappedJsonIntl(String data, String region, String err, int code, boolean forceJson) {
        try {
            JSONObject o = new JSONObject(data);
            o.put(regKey, (Object)region);
            if (err != null && !err.isEmpty()) {
                o.put(errKey, (Object)err);
            }
            if (code > 0) {
                o.put(codeKey, (Object)("" + code));
            }
            return o.toString();
        }
        catch (JSONException e) {
            try {
                JSONArray a = new JSONArray(data);
                if (!forceJson) {
                    return data;
                }
                JSONObject o = new JSONObject();
                o.put(regKey, (Object)region);
                if (err != null && !err.isEmpty()) {
                    o.put(errKey, (Object)err);
                }
                if (code > 0) {
                    o.put(codeKey, (Object)("" + code));
                }
                o.accumulate("response", (Object)a);
                return o.toString();
            }
            catch (JSONException ae) {
                if (!forceJson) {
                    return data;
                }
                JSONObject o = new JSONObject();
                o.put(regKey, (Object)region);
                if (err != null && !err.isEmpty()) {
                    o.put(errKey, (Object)err);
                }
                if (code > 0) {
                    o.put(codeKey, (Object)("" + code));
                }
                o.put("response", (Object)data);
                return o.toString();
            }
        }
    }
}

