/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.tuple.Tuple2;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.ChannelStream;
import reactor.io.net.ReactorChannelHandler;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpClient;
import reactor.io.net.http.model.Method;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.http.NettyHttpChannel;
import reactor.io.net.impl.netty.http.NettyHttpClientHandler;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;
import reactor.rx.Streams;

public class NettyHttpClient<IN, OUT>
extends HttpClient<IN, OUT> {
    private static final Logger log = LoggerFactory.getLogger(NettyHttpClient.class);
    private final NettyTcpClient<IN, OUT> client;
    private final Promise<NettyHttpChannel<IN, OUT>> reply;
    private String lastURL = "http://localhost:8080";

    public NettyHttpClient(Environment env, Dispatcher dispatcher, final InetSocketAddress connectAddress, ClientSocketOptions options, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(env, dispatcher, codec, options);
        this.client = new NettyTcpClient<IN, OUT>(env, dispatcher, connectAddress, options, sslOptions, codec){

            @Override
            protected void bindChannel(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, Object nativeChannel) {
                NettyHttpClient.this.bindChannel(handler, nativeChannel);
            }

            @Override
            public InetSocketAddress getConnectAddress() {
                if (connectAddress != null) {
                    return connectAddress;
                }
                try {
                    URL url = new URL(NettyHttpClient.this.lastURL);
                    String host = url.getHost();
                    int port = url.getPort();
                    return new InetSocketAddress(host, port);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(e);
                }
            }
        };
        this.reply = Promises.prepare();
    }

    @Override
    protected Promise<Void> doStart(final ReactorChannelHandler<IN, OUT, HttpChannel<IN, OUT>> handler) {
        return this.client.start(new ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>>(){

            public Publisher<Void> apply(ChannelStream<IN, OUT> inoutChannelStream) {
                NettyHttpChannel ch = (NettyHttpChannel)inoutChannelStream;
                return (Publisher)handler.apply(ch);
            }
        });
    }

    @Override
    protected Stream<Tuple2<InetSocketAddress, Integer>> doStart(final ReactorChannelHandler<IN, OUT, HttpChannel<IN, OUT>> handler, Reconnect reconnect) {
        return this.client.start(new ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>>(){

            public Publisher<Void> apply(ChannelStream<IN, OUT> inoutChannelStream) {
                NettyHttpChannel ch = (NettyHttpChannel)inoutChannelStream;
                return (Publisher)handler.apply(ch);
            }
        }, reconnect);
    }

    @Override
    public Promise<? extends HttpChannel<IN, OUT>> request(final Method method, final String url, final ReactorChannelHandler<IN, OUT, HttpChannel<IN, OUT>> handler) {
        this.lastURL = url;
        Assert.isTrue((method != null && url != null ? 1 : 0) != 0);
        this.start(new ReactorChannelHandler<IN, OUT, HttpChannel<IN, OUT>>(){

            public Publisher<Void> apply(HttpChannel<IN, OUT> inoutHttpChannel) {
                NettyHttpChannel ch = (NettyHttpChannel)inoutHttpChannel;
                ch.getNettyRequest().setUri(URI.create(url).getPath()).setMethod(new HttpMethod(method.getName()));
                if (handler != null) {
                    try {
                        Publisher p = (Publisher)handler.apply(ch);
                        NettyHttpClient.this.reply.onNext((Object)ch);
                        return p;
                    }
                    catch (Throwable t) {
                        NettyHttpClient.this.reply.onError(t);
                        return Promises.error((Throwable)t);
                    }
                }
                NettyHttpClient.this.reply.onNext((Object)ch);
                return Streams.never();
            }
        });
        return this.reply;
    }

    @Override
    protected final Promise<Void> doShutdown() {
        return this.client.shutdown();
    }

    protected void bindChannel(ReactorChannelHandler<IN, OUT, ChannelStream<IN, OUT>> handler, Object nativeChannel) {
        SocketChannel ch = (SocketChannel)nativeChannel;
        NettyChannelStream netChannel = new NettyChannelStream(this.getDefaultEnvironment(), this.getDefaultCodec(), this.getDefaultPrefetchSize(), this.getDefaultDispatcher(), (Channel)ch);
        ChannelPipeline pipeline = ch.pipeline();
        if (log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyHttpClient.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new NettyHttpClientHandler<IN, OUT>(handler, netChannel)});
    }
}

