/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.connectors.publish.source.http;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.mantisrx.common.metrics.Counter;
import io.mantisrx.common.metrics.Metrics;
import io.mantisrx.common.metrics.MetricsRegistry;
import io.mantisrx.common.metrics.spectator.MetricGroupId;
import io.mantisrx.connectors.publish.core.QueryRegistry;
import io.mantisrx.publish.proto.MantisServerSubscription;
import io.mantisrx.publish.proto.MantisServerSubscriptionEnvelope;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.List;
import mantis.io.reactivex.netty.protocol.http.server.UriInfoHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.subjects.Subject;

public class HttpSourceServerHandler
extends SimpleChannelInboundHandler<HttpObject> {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpSourceServerHandler.class);
    private static final byte[] CONTENT = new byte[]{79, 75};
    private static final AsciiString CONTENT_TYPE = AsciiString.cached((String)"Content-Type");
    private static final AsciiString CONTENT_LENGTH = AsciiString.cached((String)"Content-Length");
    private static final AsciiString CONNECTION = AsciiString.cached((String)"Connection");
    private static final AsciiString KEEP_ALIVE = AsciiString.cached((String)"keep-alive");
    ObjectMapper mapper = new ObjectMapper();
    private final Counter getRequestCount;
    private final Counter unknownRequestCount;
    private final Counter postRequestCount;
    MetricGroupId metricGroupId;
    private final QueryRegistry registry;
    private final Subject<String, String> eventSubject;

    public HttpSourceServerHandler(QueryRegistry queryRegistry, Subject<String, String> eventSubject) {
        this.registry = queryRegistry;
        this.eventSubject = eventSubject;
        this.metricGroupId = new MetricGroupId("PushServer_incoming");
        Metrics m = new Metrics.Builder().id(this.metricGroupId).addCounter("GetRequestCount").addCounter("PostRequestCount").addCounter("UnknownRequestCount").build();
        m = MetricsRegistry.getInstance().registerAndGet(m);
        this.getRequestCount = m.getCounter("GetRequestCount");
        this.unknownRequestCount = m.getCounter("UnknownRequestCount");
        this.postRequestCount = m.getCounter("PostRequestCount");
    }

    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {
        block11: {
            if (msg instanceof HttpRequest) {
                HttpRequest req = (HttpRequest)msg;
                boolean keepAlive = HttpUtil.isKeepAlive((HttpMessage)req);
                if (req.method().equals((Object)HttpMethod.GET)) {
                    this.getRequestCount.increment();
                    UriInfoHolder uriInfoHolder = new UriInfoHolder(req.uri());
                    List<MantisServerSubscription> currentSubscriptions = this.registry.getCurrentSubscriptions(uriInfoHolder.getQueryParameters());
                    try {
                        byte[] serializedSubs = this.mapper.writeValueAsBytes((Object)new MantisServerSubscriptionEnvelope(currentSubscriptions));
                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer((byte[])serializedSubs));
                        response.headers().set((CharSequence)CONTENT_TYPE, (Object)"application/json");
                        response.headers().setInt((CharSequence)CONTENT_LENGTH, response.content().readableBytes());
                        if (!keepAlive) {
                            ctx.write((Object)response).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                            break block11;
                        }
                        response.headers().set((CharSequence)CONNECTION, (Object)KEEP_ALIVE);
                        ctx.write((Object)response);
                    }
                    catch (Exception e) {
                        LOGGER.error("problem reading from channel", (Throwable)e);
                    }
                } else if (req.method().equals((Object)HttpMethod.POST)) {
                    this.postRequestCount.increment();
                    FullHttpMessage aggregator = (FullHttpMessage)msg;
                    ByteBuf content = aggregator.content();
                    String data = content.toString(CharsetUtil.UTF_8);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("got data " + data);
                    }
                    this.eventSubject.onNext((Object)data);
                    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer((byte[])CONTENT));
                    response.headers().set((CharSequence)CONTENT_TYPE, (Object)"text/plain");
                    response.headers().setInt((CharSequence)CONTENT_LENGTH, response.content().readableBytes());
                    if (!keepAlive) {
                        ctx.write((Object)response).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                    } else {
                        response.headers().set((CharSequence)CONNECTION, (Object)KEEP_ALIVE);
                        ctx.write((Object)response);
                    }
                } else {
                    this.unknownRequestCount.increment();
                }
            }
        }
    }
}

