/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.concurrent.GenericFutureListener;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.io.buffer.Buffer;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.impl.netty.NettyChannelHandlerBridge;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.http.NettyHttpChannel;
import reactor.rx.action.support.DefaultSubscriber;

public class NettyHttpServerHandler<IN, OUT>
extends NettyChannelHandlerBridge<IN, OUT> {
    private final NettyChannelStream<IN, OUT> tcpStream;
    private NettyHttpChannel<IN, OUT> request;

    public NettyHttpServerHandler(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, NettyChannelStream<IN, OUT> tcpStream) {
        super(handler, tcpStream);
        this.tcpStream = tcpStream;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        ctx.read();
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> messageClass = msg.getClass();
        if (this.request == null && HttpRequest.class.isAssignableFrom(messageClass)) {
            this.request = new NettyHttpChannel<IN, OUT>(this.tcpStream, (HttpRequest)msg);
            final Publisher closePublisher = (Publisher)this.handler.apply(this.request);
            DefaultSubscriber<Void> closeSub = new DefaultSubscriber<Void>(){
                Subscription subscription;

                public void onSubscribe(Subscription s) {
                    this.subscription = s;
                    s.request(Long.MAX_VALUE);
                }

                public void onError(Throwable t) {
                    log.error("Error processing connection. Closing the channel.", t);
                    if (NettyHttpServerHandler.this.channelSubscription == null && ctx.channel().isOpen()) {
                        ctx.channel().close();
                    }
                }

                public void onComplete() {
                    this.subscription.cancel();
                    if (NettyHttpServerHandler.this.channelSubscription == null && ctx.channel().isOpen()) {
                        if (log.isDebugEnabled()) {
                            log.debug("Close Http Response ");
                        }
                        ctx.channel().writeAndFlush((Object)LastHttpContent.EMPTY_LAST_CONTENT).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                    }
                }
            };
            if (this.request.checkHeader()) {
                ctx.writeAndFlush((Object)this.request.getNettyResponse()).addListener((GenericFutureListener)new ChannelFutureListener((Subscriber)closeSub){
                    final /* synthetic */ Subscriber val$closeSub;
                    {
                        this.val$closeSub = subscriber;
                    }

                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (future.isSuccess()) {
                            closePublisher.subscribe(this.val$closeSub);
                        } else {
                            this.val$closeSub.onError(future.cause());
                        }
                    }
                });
            } else {
                closePublisher.subscribe((Subscriber)closeSub);
            }
        }
        if (HttpContent.class.isAssignableFrom(messageClass)) {
            super.channelRead(ctx, ((ByteBufHolder)msg).content());
            if (this.channelSubscription != null && DefaultLastHttpContent.class.equals(msg.getClass())) {
                this.channelSubscription.onComplete();
                this.channelSubscription = null;
            }
        }
    }

    @Override
    protected ChannelFuture doOnWrite(Object data, ChannelHandlerContext ctx) {
        if (data.getClass().equals(Buffer.class)) {
            return ctx.write((Object)new DefaultHttpContent(NettyHttpServerHandler.convertBufferToByteBuff(ctx, (Buffer)data)));
        }
        return ctx.write(data);
    }
}

