/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.tcp;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.SocketChannelConfig;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.NamedDaemonThreadFactory;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.config.ServerSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyEventLoopDispatcher;
import reactor.io.net.impl.netty.NettyNetChannelInboundHandler;
import reactor.io.net.impl.netty.NettyServerSocketOptions;
import reactor.io.net.tcp.TcpServer;
import reactor.io.net.tcp.ssl.SSLEngineSupplier;
import reactor.rx.Promise;
import reactor.rx.Promises;

public class NettyTcpServer<IN, OUT>
extends TcpServer<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(NettyTcpServer.class);
    private final NettyServerSocketOptions nettyOptions;
    private final ServerBootstrap bootstrap;
    private final EventLoopGroup selectorGroup;
    private final EventLoopGroup ioGroup;

    protected NettyTcpServer(@Nonnull Environment env, @Nonnull Dispatcher dispatcher, @Nullable InetSocketAddress listenAddress, final ServerSocketOptions options, final SslOptions sslOptions, @Nullable Codec<Buffer, IN, OUT> codec) {
        super(env, dispatcher, listenAddress, options, sslOptions, codec);
        this.nettyOptions = options instanceof NettyServerSocketOptions ? (NettyServerSocketOptions)options : null;
        int selectThreadCount = (Integer)this.getEnvironment().getProperty("reactor.tcp.selectThreadCount", Integer.class, (Object)(Environment.PROCESSORS / 2));
        int ioThreadCount = (Integer)this.getEnvironment().getProperty("reactor.tcp.ioThreadCount", Integer.class, (Object)Environment.PROCESSORS);
        this.selectorGroup = new NioEventLoopGroup(selectThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-select"));
        this.ioGroup = null != this.nettyOptions && null != this.nettyOptions.eventLoopGroup() ? this.nettyOptions.eventLoopGroup() : new NioEventLoopGroup(ioThreadCount, (ThreadFactory)new NamedDaemonThreadFactory("reactor-tcp-io"));
        this.bootstrap = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().group(this.selectorGroup, this.ioGroup).channel(NioServerSocketChannel.class)).option(ChannelOption.SO_BACKLOG, (Object)options.backlog())).option(ChannelOption.SO_RCVBUF, (Object)options.rcvbuf())).option(ChannelOption.SO_SNDBUF, (Object)options.sndbuf())).option(ChannelOption.SO_REUSEADDR, (Object)options.reuseAddr())).localAddress((SocketAddress)(null == listenAddress ? new InetSocketAddress(3000) : listenAddress))).childOption(ChannelOption.ALLOCATOR, (Object)PooledByteBufAllocator.DEFAULT).childOption(ChannelOption.AUTO_READ, (Object)(sslOptions != null ? 1 : 0)).childHandler((ChannelHandler)new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) throws Exception {
                SocketChannelConfig config = ch.config();
                config.setReceiveBufferSize(options.rcvbuf());
                config.setSendBufferSize(options.sndbuf());
                config.setKeepAlive(options.keepAlive());
                config.setReuseAddress(options.reuseAddr());
                config.setSoLinger(options.linger());
                config.setTcpNoDelay(options.tcpNoDelay());
                if (NettyTcpServer.this.log.isDebugEnabled()) {
                    NettyTcpServer.this.log.debug("CONNECT {}", (Object)ch);
                }
                if (null != sslOptions) {
                    SSLEngine ssl = new SSLEngineSupplier(sslOptions, false).get();
                    if (NettyTcpServer.this.log.isDebugEnabled()) {
                        NettyTcpServer.this.log.debug("SSL enabled using keystore {}", (Object)(null != sslOptions.keystoreFile() ? sslOptions.keystoreFile() : "<DEFAULT>"));
                    }
                    ch.pipeline().addLast(new ChannelHandler[]{new SslHandler(ssl)});
                }
                if (null != NettyTcpServer.this.nettyOptions && null != NettyTcpServer.this.nettyOptions.pipelineConfigurer()) {
                    NettyTcpServer.this.nettyOptions.pipelineConfigurer().accept((Object)ch.pipeline());
                }
                NettyTcpServer.this.bindChannel(ch, options.prefetch());
            }
        });
    }

    @Override
    public Promise<Boolean> start() {
        ChannelFuture bindFuture = this.bootstrap.bind();
        final Promise promise = Promises.ready((Environment)this.getEnvironment(), (Dispatcher)this.getDispatcher());
        bindFuture.addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                NettyTcpServer.this.log.info("BIND {}", (Object)future.channel().localAddress());
                if (future.isSuccess()) {
                    promise.onNext((Object)true);
                } else {
                    promise.onError(future.cause());
                }
            }
        });
        return promise;
    }

    @Override
    public Promise<Boolean> shutdown() {
        final Promise d = Promises.ready((Environment)this.getEnvironment(), (Dispatcher)this.getDispatcher());
        final AtomicInteger groupsToShutdown = new AtomicInteger(2);
        GenericFutureListener listener = new GenericFutureListener(){

            public void operationComplete(Future future) throws Exception {
                if (groupsToShutdown.decrementAndGet() == 0) {
                    NettyTcpServer.this.notifyShutdown();
                    d.onNext((Object)true);
                }
            }
        };
        this.selectorGroup.shutdownGracefully().addListener(listener);
        if (null == this.nettyOptions || null == this.nettyOptions.eventLoopGroup()) {
            this.ioGroup.shutdownGracefully().addListener(listener);
        }
        return d;
    }

    @Override
    protected NettyChannelStream<IN, OUT> bindChannel(Object nativeChannel, long prefetch) {
        SocketChannel ch = (SocketChannel)nativeChannel;
        NettyChannelStream netChannel = new NettyChannelStream(this.getEnvironment(), this.getDefaultCodec(), prefetch == -1L ? this.getPrefetchSize() : prefetch, this, (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop(), 256), this.getDispatcher(), (Channel)ch);
        ChannelPipeline pipeline = ch.pipeline();
        if (this.log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(this.getClass())});
        }
        pipeline.addLast(new ChannelHandler[]{new NettyNetChannelInboundHandler(netChannel.in(), netChannel)});
        return netChannel;
    }
}

