/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.fn.Consumer;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpServer;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyEventLoopDispatcher;
import reactor.io.net.impl.netty.http.NettyHttpChannel;
import reactor.io.net.impl.netty.http.NettyHttpServerHandler;
import reactor.io.net.impl.netty.tcp.NettyTcpServer;
import reactor.io.net.tcp.TcpServer;
import reactor.rx.Promise;
import reactor.rx.Streams;

public class NettyHttpServer<IN, OUT>
extends HttpServer<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(NettyHttpServer.class);
    protected final TcpServer<IN, OUT> server;

    protected NettyHttpServer(Environment env, Dispatcher dispatcher, InetSocketAddress listenAddress, ServerSocketOptions options, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(env, dispatcher, codec);
        this.server = new NettyTcpServer<IN, OUT>(env, dispatcher, listenAddress, options, sslOptions, codec){

            @Override
            protected NettyChannelStream<IN, OUT> bindChannel(Object nativeChannel, long prefetch) {
                NettyHttpServer.this.bindChannel(nativeChannel, prefetch);
                return null;
            }
        };
        this.server.consume(null, (Consumer)new Consumer<Throwable>(){

            public void accept(Throwable throwable) {
                NettyHttpServer.this.notifyError(throwable);
            }
        }, (Consumer)new Consumer<Void>(){

            public void accept(Void aVoid) {
                NettyHttpServer.this.notifyShutdown();
            }
        });
    }

    @Override
    public final Promise<Boolean> start() {
        return this.server.start();
    }

    @Override
    public final Promise<Boolean> shutdown() {
        return this.server.shutdown();
    }

    protected HttpChannel<IN, OUT> createServerRequest(NettyChannelStream<IN, OUT> channelStream, HttpRequest content) {
        NettyHttpChannel<IN, OUT> request = new NettyHttpChannel<IN, OUT>(channelStream, this.server, content, this.getDefaultCodec());
        Iterable<Publisher<OUT>> handlers = this.routeChannel(request);
        this.subscribeChannelHandlers(Streams.concat(handlers), request);
        channelStream.subscribe(request.in());
        return request;
    }

    @Override
    protected HttpChannel<IN, OUT> bindChannel(Object nativeChannel, long prefetch) {
        SocketChannel ch = (SocketChannel)nativeChannel;
        NettyChannelStream netChannel = new NettyChannelStream(this.getEnvironment(), this.getDefaultCodec(), prefetch == -1L ? this.getPrefetchSize() : prefetch, this.server, (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop(), 256), this.getDispatcher(), (Channel)ch);
        ChannelPipeline pipeline = ch.pipeline();
        if (this.log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(this.getClass())});
        }
        pipeline.addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new NettyHttpServerHandler(netChannel, this)});
        return null;
    }
}

