/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.api.push;

import com.netflix.config.DynamicIntProperty;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Util;
import io.mantisrx.api.push.ConnectionBroker;
import io.mantisrx.api.push.MantisSSEHandler;
import io.mantisrx.api.push.PushConnectionDetails;
import io.mantisrx.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.util.ReferenceCountUtil;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Subscription;

public class MantisWebSocketFrameHandler
extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(MantisWebSocketFrameHandler.class);
    private final ConnectionBroker connectionBroker;
    private final DynamicIntProperty queueCapacity = new DynamicIntProperty("io.mantisrx.api.push.queueCapacity", 1000);
    private final DynamicIntProperty writeIntervalMillis = new DynamicIntProperty("io.mantisrx.api.push.writeIntervalMillis", 50);
    private Subscription subscription;
    private String uri;
    private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().setNameFormat("websocket-handler-drainer-%d").build());
    private ScheduledFuture drainFuture;

    public MantisWebSocketFrameHandler(ConnectionBroker broker) {
        super(true);
        this.connectionBroker = broker;
    }

    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt.getClass() == WebSocketServerProtocolHandler.HandshakeComplete.class) {
            WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete)evt;
            this.uri = complete.requestUri();
            PushConnectionDetails pcd = PushConnectionDetails.from(this.uri);
            log.info("Request to URI '{}' is a WebSSocket upgrade, removing the SSE handler", (Object)this.uri);
            if (ctx.pipeline().get(MantisSSEHandler.class) != null) {
                ctx.pipeline().remove(MantisSSEHandler.class);
            }
            String[] tags = Util.getTaglist(this.uri, pcd.target);
            Counter numDroppedBytesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkBytes", (String)pcd.target, (String[])tags);
            Counter numDroppedMessagesCounter = SpectatorUtils.newCounter((String)"numDroppedSinkMessages", (String)pcd.target, (String[])tags);
            Counter numMessagesCounter = SpectatorUtils.newCounter((String)"numSinkMessages", (String)pcd.target, (String[])tags);
            Counter numBytesCounter = SpectatorUtils.newCounter((String)"numSinkBytes", (String)pcd.target, (String[])tags);
            Counter drainTriggeredCounter = SpectatorUtils.newCounter((String)"drainTriggered", (String)pcd.target, (String[])tags);
            Counter numIncomingMessagesCounter = SpectatorUtils.newCounter((String)"numIncomingMessages", (String)pcd.target, (String[])tags);
            LinkedBlockingQueue queue = new LinkedBlockingQueue(this.queueCapacity.get());
            this.drainFuture = this.scheduledExecutorService.scheduleAtFixedRate(() -> {
                block6: {
                    try {
                        if (queue.size() <= 0 || !ctx.channel().isWritable()) break block6;
                        drainTriggeredCounter.increment();
                        ArrayList items = new ArrayList(queue.size());
                        BlockingQueue blockingQueue = queue;
                        synchronized (blockingQueue) {
                            queue.drainTo(items);
                        }
                        for (String data : items) {
                            ctx.write((Object)new TextWebSocketFrame(data));
                            numMessagesCounter.increment();
                            numBytesCounter.increment((long)data.length());
                        }
                        ctx.flush();
                    }
                    catch (Exception ex) {
                        log.error("Error writing to channel", (Throwable)ex);
                    }
                }
            }, this.writeIntervalMillis.get(), this.writeIntervalMillis.get(), TimeUnit.MILLISECONDS);
            this.subscription = this.connectionBroker.connect(pcd).doOnNext(event -> {
                numIncomingMessagesCounter.increment();
                if (!"DUMMY_TIMER_DATA".equals(event)) {
                    boolean offer = false;
                    BlockingQueue blockingQueue = queue;
                    synchronized (blockingQueue) {
                        offer = queue.offer(event);
                    }
                    if (!offer) {
                        numDroppedBytesCounter.increment((long)event.length());
                        numDroppedMessagesCounter.increment();
                    }
                }
            }).subscribe();
        } else {
            ReferenceCountUtil.retain((Object)evt);
            super.userEventTriggered(ctx, evt);
        }
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is unregistered. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelUnregistered(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("Channel {} is inactive. URI: {}", (Object)ctx.channel(), (Object)this.uri);
        this.unsubscribeIfSubscribed();
        super.channelInactive(ctx);
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.warn("Exception caught by channel {}. URI: {}", new Object[]{ctx.channel(), this.uri, cause});
        this.unsubscribeIfSubscribed();
        ctx.close();
    }

    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) {
    }

    private void unsubscribeIfSubscribed() {
        if (this.subscription != null && !this.subscription.isUnsubscribed()) {
            log.info("WebSocket unsubscribing subscription with URI: {}", (Object)this.uri);
            this.subscription.unsubscribe();
        }
        if (this.drainFuture != null) {
            this.drainFuture.cancel(false);
        }
        if (this.scheduledExecutorService != null) {
            this.scheduledExecutorService.shutdown();
        }
    }
}

