/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.logging.LoggingHandler;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.Environment;
import reactor.core.Dispatcher;
import reactor.core.support.Assert;
import reactor.fn.Consumer;
import reactor.fn.Function;
import reactor.io.buffer.Buffer;
import reactor.io.codec.Codec;
import reactor.io.net.Reconnect;
import reactor.io.net.config.ClientSocketOptions;
import reactor.io.net.config.SslOptions;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.HttpClient;
import reactor.io.net.http.model.Method;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.NettyEventLoopDispatcher;
import reactor.io.net.impl.netty.http.NettyHttpChannel;
import reactor.io.net.impl.netty.http.NettyHttpClientHandler;
import reactor.io.net.impl.netty.tcp.NettyTcpClient;
import reactor.rx.Promise;
import reactor.rx.Promises;
import reactor.rx.Stream;

public class NettyHttpClient<IN, OUT>
extends HttpClient<IN, OUT> {
    private final Logger log = LoggerFactory.getLogger(NettyHttpClient.class);
    private final NettyTcpClient<IN, OUT> client;
    private String lastURL = "http://localhost:8080";

    public NettyHttpClient(Environment env, Dispatcher dispatcher, final InetSocketAddress connectAddress, ClientSocketOptions options, SslOptions sslOptions, Codec<Buffer, IN, OUT> codec) {
        super(env, dispatcher, codec);
        this.client = new NettyTcpClient<IN, OUT>(env, dispatcher, connectAddress, options, sslOptions, codec){

            @Override
            protected NettyChannelStream<IN, OUT> bindChannel(Object nativeChannel, long prefetch) {
                NettyHttpClient.this.bindChannel(nativeChannel, prefetch);
                return null;
            }

            @Override
            public InetSocketAddress getConnectAddress() {
                if (connectAddress != null) {
                    return connectAddress;
                }
                try {
                    URL url = new URL(NettyHttpClient.this.lastURL);
                    String host = url.getHost();
                    int port = url.getPort();
                    return new InetSocketAddress(host, port);
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(e);
                }
            }
        };
    }

    @Override
    public Promise<? extends HttpChannel<IN, OUT>> request(final Method method, final String url, final Function<HttpChannel<IN, OUT>, ? extends Publisher<? extends OUT>> handler) {
        this.lastURL = url;
        Assert.isTrue((method != null && url != null ? 1 : 0) != 0);
        final Promise p = Promises.prepare();
        this.take(1L).consume(new Consumer<HttpChannel<IN, OUT>>(){

            public void accept(HttpChannel<IN, OUT> inoutHttpChannel) {
                final NettyHttpClientChannel ch = (NettyHttpClientChannel)inoutHttpChannel;
                ch.getNettyRequest().setUri(URI.create(url).getPath()).setMethod(new HttpMethod(method.getName()));
                ch.promise.onComplete((Consumer)new Consumer<Promise<Object>>(){

                    public void accept(Promise<Object> promise) {
                        if (promise.isError()) {
                            p.onError(promise.reason());
                        } else {
                            p.onNext((Object)ch);
                        }
                    }
                });
                if (handler != null) {
                    NettyHttpClient.this.addWritePublisher((Publisher)handler.apply(inoutHttpChannel));
                }
            }
        }, (Consumer)new Consumer<Throwable>(){

            public void accept(Throwable throwable) {
                p.onError(throwable);
            }
        });
        return p;
    }

    @Override
    public Promise<Boolean> open() {
        return this.client.open();
    }

    @Override
    public Stream<Boolean> open(Reconnect reconnect) {
        return this.client.open(reconnect);
    }

    @Override
    public Promise<Boolean> close() {
        return this.client.close();
    }

    protected NettyHttpChannel<IN, OUT> createClientRequest(NettyChannelStream<IN, OUT> tcpStream, HttpRequest request) {
        NettyHttpClientChannel httpChannel = new NettyHttpClientChannel(tcpStream, request);
        this.notifyNewChannel(httpChannel);
        this.mergeWrite(httpChannel);
        return httpChannel;
    }

    @Override
    protected HttpChannel<IN, OUT> bindChannel(Object nativeChannel, long prefetch) {
        SocketChannel ch = (SocketChannel)nativeChannel;
        int backlog = 128;
        NettyChannelStream netChannel = new NettyChannelStream(this.getEnvironment(), this.getDefaultCodec(), prefetch == -1L ? this.getPrefetchSize() : prefetch, this.client, (Dispatcher)new NettyEventLoopDispatcher(ch.eventLoop(), backlog), this.getDispatcher(), (Channel)ch);
        ChannelPipeline pipeline = ch.pipeline();
        if (this.log.isDebugEnabled()) {
            pipeline.addLast(new ChannelHandler[]{new LoggingHandler(NettyHttpClient.class)});
        }
        pipeline.addLast(new ChannelHandler[]{new HttpClientCodec()}).addLast(new ChannelHandler[]{new NettyHttpClientHandler(netChannel, this)});
        return null;
    }

    private class NettyHttpClientChannel
    extends NettyHttpChannel<IN, OUT> {
        final Buffer body;
        private final NettyChannelStream<IN, OUT> tcpStream;
        private final HttpRequest request;
        private final Promise<Object> promise;

        public NettyHttpClientChannel(NettyChannelStream<IN, OUT> tcpStream, HttpRequest request) {
            super(tcpStream, NettyHttpClient.this.client, request, NettyHttpClient.this.getDefaultCodec());
            this.tcpStream = tcpStream;
            this.request = request;
            this.body = new Buffer();
            this.promise = Promises.ready((Environment)this.getEnvironment(), (Dispatcher)this.getDispatcher());
        }

        @Override
        protected void write(ByteBuffer data, Subscriber<?> onComplete, boolean flush) {
            this.body.append(new ByteBuffer[]{data});
            if (flush) {
                this.write(1, null, true);
            }
        }

        @Override
        protected void write(Object data, Subscriber<?> onComplete, boolean flush) {
            if (HEADERS_SENT.compareAndSet(this, 0, 1)) {
                DefaultFullHttpRequest req = new DefaultFullHttpRequest(this.request.getProtocolVersion(), this.request.getMethod(), this.request.getUri(), Unpooled.wrappedBuffer((ByteBuffer)this.body.flip().byteBuffer()));
                HttpHeaders.setContentLength((HttpMessage)req, (long)this.body.limit());
                HttpHeaders.setHeader((HttpMessage)req, (String)"Content-Type", (Object)HttpHeaders.getHeader((HttpMessage)this.request, (String)"Content-Type"));
                this.tcpStream.write(req, (Subscriber<?>)this.promise, true);
            }
        }
    }
}

