/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.api.push;

import com.netflix.config.DynamicIntProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.ConnectionBroker;
import io.mantisrx.api.push.MantisSSEHandler;
import io.mantisrx.api.push.PushConnectionDetails;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscription;

public class MantisWebSocketFrameHandler
extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(MantisWebSocketFrameHandler.class);
    private final ConnectionBroker connectionBroker;
    private final DynamicIntProperty queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
    private final DynamicIntProperty writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
    private Subscription subscription;
    private String uri;

    public MantisWebSocketFrameHandler(ConnectionBroker broker) {
        super(true);
        this.connectionBroker = broker;
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt.getClass() == WebSocketServerProtocolHandler.HandshakeComplete.class) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete)evt;
            this.uri = complete.requestUri();
            PushConnectionDetails pcd = PushConnectionDetails.from(this.uri);
            log.info("Request to URI '{}' is a WebSSocket upgrade, removing the SSE handler", (Object)this.uri);
            if (ctx.pipeline().get(MantisSSEHandler.class) != null) {
                ctx.pipeline().remove(MantisSSEHandler.class);
            }
            String[] tags = Util.getTaglist(this.uri, pcd.target);
            Counter numDroppedBytesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkBytes", (String)pcd.target, (String[])tags);
            Counter numDroppedMessagesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkMessages", (String)pcd.target, (String[])tags);
            Counter numMessagesCounter = SpectatorUtils.newCounter((String)"numSinkMessages", (String)pcd.target, (String[])tags);
            Counter numBytesCounter = SpectatorUtils.newCounter((String)"numSinkBytes", (String)pcd.target, (String[])tags);
            LinkedBlockingQueue queue = new LinkedBlockingQueue(this.queueCapacity.get());
            this.subscription = this.connectionBroker.connect(pcd).mergeWith(Observable.interval((long)this.writeIntervalMillis.get(), (TimeUnit)TimeUnit.MILLISECONDS).map(__ -> "DUMMY_TIMER_DATA")).doOnNext(event -> {
                if (!"DUMMY_TIMER_DATA".equals(event) && !queue.offer(event)) {
                    numDroppedBytesCounter.increment((long)event.length());
                    numDroppedMessagesCounter.increment();
                }
            }).filter("DUMMY_TIMER_DATA"::equals).doOnNext(__ -> {
                if (ctx.channel().isWritable()) {
                    ArrayList items = new ArrayList(queue.size());
                    queue.drainTo(items);
                    for (String event : items) {
                        ctx.writeAndFlush((Object)new TextWebSocketFrame(event));
                        numMessagesCounter.increment();
                        numBytesCounter.increment((long)event.length());
                    }
                }
            }).subscribe();
        } else {
            ReferenceCountUtil.retain((Object)evt);
            super.userEventTriggered(ctx, evt);
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is unregistered. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelUnregistered(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is inactive. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Exception caught by channel {}. URI: {}", new Object[]{ctx.channel(), this.uri, cause});
        this.unsubscribeIfSubscribed();
        ctx.close();
    }

    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
    }

    private void unsubscribeIfSubscribed() {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            log.info("WebSocket unsubscribing subscription with URI: {}", (Object)this.uri);
            this.subscription.unsubscribe();
        }
    }
}

