/*
 * Decompiled with CFR 0.152.
 */
package reactor.io.net.impl.netty.http;

import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import java.net.InetSocketAddress;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.support.Assert;
import reactor.io.net.ReactorChannel;
import reactor.io.net.http.HttpChannel;
import reactor.io.net.http.model.HttpHeaders;
import reactor.io.net.http.model.Method;
import reactor.io.net.http.model.Protocol;
import reactor.io.net.http.model.ResponseHeaders;
import reactor.io.net.http.model.Status;
import reactor.io.net.http.model.Transfer;
import reactor.io.net.impl.netty.NettyChannelStream;
import reactor.io.net.impl.netty.http.NettyHttpHeaders;
import reactor.io.net.impl.netty.http.NettyHttpResponseHeaders;

public class NettyHttpChannel<IN, OUT>
extends HttpChannel<IN, OUT> {
    private final NettyChannelStream<IN, OUT> tcpStream;
    private final HttpRequest nettyRequest;
    private final NettyHttpHeaders headers;
    private HttpResponse nettyResponse;
    private NettyHttpResponseHeaders responseHeaders;

    public NettyHttpChannel(NettyChannelStream<IN, OUT> tcpStream, HttpRequest request) {
        super(tcpStream.getEnvironment(), tcpStream.getCapacity(), tcpStream.getDispatcher());
        this.tcpStream = tcpStream;
        this.nettyRequest = request;
        this.nettyResponse = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);
        this.headers = new NettyHttpHeaders(request);
        this.responseHeaders = new NettyHttpResponseHeaders(this.nettyResponse);
        this.responseHeader("Transfer-Encoding", "chunked");
    }

    @Override
    protected void doSubscribeWriter(Publisher<? extends OUT> writer, Subscriber<? super Void> postWriter) {
        this.tcpStream.doSubscribeWriter(writer, postWriter);
    }

    @Override
    protected void doDecoded(IN in) {
        this.tcpStream.doDecoded(in);
    }

    public void subscribe(Subscriber<? super IN> subscriber) {
        this.tcpStream.subscribe(subscriber);
    }

    @Override
    public Protocol protocol() {
        HttpVersion version = this.nettyRequest.getProtocolVersion();
        if (version.equals((Object)HttpVersion.HTTP_1_0)) {
            return Protocol.HTTP_1_0;
        }
        if (version.equals((Object)HttpVersion.HTTP_1_1)) {
            return Protocol.HTTP_1_1;
        }
        throw new IllegalStateException(version.protocolName() + " not supported");
    }

    @Override
    protected void doHeader(String name, String value) {
        this.headers.set(name, value);
    }

    @Override
    protected void doAddHeader(String name, String value) {
        this.headers.add(name, value);
    }

    @Override
    public String uri() {
        return this.nettyRequest.getUri();
    }

    @Override
    public Method method() {
        return new Method(this.nettyRequest.getMethod().name());
    }

    @Override
    public HttpHeaders headers() {
        return this.headers;
    }

    public HttpRequest getNettyRequest() {
        return this.nettyRequest;
    }

    @Override
    public Status responseStatus() {
        return Status.valueOf(this.nettyResponse.getStatus().code());
    }

    @Override
    public void doResponseStatus(Status status) {
        this.nettyResponse.setStatus(HttpResponseStatus.valueOf((int)status.getCode()));
    }

    @Override
    public Transfer transfer() {
        if ("chunked".equals(this.headers.get("Transfer-Encoding"))) {
            Assert.isTrue((boolean)Protocol.HTTP_1_1.equals((Object)this.protocol()));
            return Transfer.CHUNKED;
        }
        if (this.headers.get("Transfer-Encoding") == null) {
            return Transfer.NON_CHUNKED;
        }
        throw new IllegalStateException("Can't determine a valide transfer based on headers and protocol");
    }

    @Override
    public HttpChannel<IN, OUT> transfer(Transfer transfer) {
        switch (transfer) {
            case EVENT_STREAM: {
                this.responseHeader("Content-Type", "text/event-stream");
            }
            case CHUNKED: {
                Assert.isTrue((boolean)Protocol.HTTP_1_1.equals((Object)this.protocol()));
                this.responseHeader("Transfer-Encoding", "chunked");
                break;
            }
            case NON_CHUNKED: {
                this.responseHeaders().remove("Transfer-Encoding");
            }
        }
        return this;
    }

    @Override
    public ResponseHeaders responseHeaders() {
        return this.responseHeaders;
    }

    @Override
    protected void doResponseHeader(String name, String value) {
        this.responseHeaders.set(name, value);
    }

    @Override
    protected void doAddResponseHeader(String name, String value) {
        this.responseHeaders.add(name, value);
    }

    public HttpResponse getNettyResponse() {
        return this.nettyResponse;
    }

    public SocketChannel delegate() {
        return (SocketChannel)this.tcpStream.delegate();
    }

    @Override
    public InetSocketAddress remoteAddress() {
        return this.tcpStream.remoteAddress();
    }

    @Override
    public ReactorChannel.ConsumerSpec on() {
        return this.tcpStream.on();
    }

    void setNettyResponse(HttpResponse nettyResponse) {
        this.nettyResponse = nettyResponse;
    }

    boolean checkHeader() {
        return HEADERS_SENT.compareAndSet(this, 0, 1);
    }

    @Override
    public boolean isKeepAlive() {
        return this.headers.isKeepAlive();
    }

    @Override
    public HttpChannel<IN, OUT> keepAlive(boolean keepAlive) {
        this.responseHeaders.keepAlive(keepAlive);
        return this;
    }
}

