package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.UnicastContentSubject;
import io.reactivex.netty.protocol.http.server.HttpServerMetricsEvent;
import io.reactivex.netty.server.ServerMetricsEvent;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/rxnetty-0.4.9.jar:io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter.class */
public class ServerRequestResponseConverter extends ChannelDuplexHandler {
    private static final Logger logger = LoggerFactory.getLogger(ServerRequestResponseConverter.class);
    public static final IOException CONN_CLOSE_BEFORE_REQUEST_COMPLETE = new IOException("Connection closed by peer before sending the entire request.");
    private final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
    private final long requestContentSubscriptionTimeoutMs;
    private RequestState currentRequestState = new RequestState();

    /* loaded from: input_file:WEB-INF/lib/rxnetty-0.4.9.jar:io/reactivex/netty/protocol/http/server/ServerRequestResponseConverter$RequestState.class */
    private final class RequestState {
        private HttpServerRequest rxRequest;
        private UnicastContentSubject contentSubject;
        private boolean isReadingRequest;

        private RequestState() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void createRxRequest(ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest) {
            this.contentSubject = UnicastContentSubject.create(ServerRequestResponseConverter.this.requestContentSubscriptionTimeoutMs, TimeUnit.MILLISECONDS);
            this.rxRequest = new HttpServerRequest(channelHandlerContext.channel(), httpRequest, this.contentSubject);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onProcessingStart(long j) {
            this.rxRequest.onProcessingStart(j);
            this.isReadingRequest = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onRequestComplete() {
            this.isReadingRequest = false;
            ServerRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE, this.rxRequest.onProcessingEnd());
            this.contentSubject.onCompleted();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onConnectionClose() {
            if (this.isReadingRequest) {
                this.contentSubject.onError(ServerRequestResponseConverter.CONN_CLOSE_BEFORE_REQUEST_COMPLETE);
            }
        }
    }

    public ServerRequestResponseConverter(MetricEventsSubject<ServerMetricsEvent<?>> metricEventsSubject, long j) {
        this.eventsSubject = metricEventsSubject;
        this.requestContentSubscriptionTimeoutMs = j;
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        Class<?> cls = obj.getClass();
        RequestState requestState = this.currentRequestState;
        boolean z = false;
        if (HttpRequest.class.isAssignableFrom(cls)) {
            z = true;
            HttpRequest httpRequest = (HttpRequest) obj;
            DecoderResult decoderResult = httpRequest.getDecoderResult();
            if (!decoderResult.isSuccess()) {
                logger.error("Invalid HTTP request recieved. Decoder error.", decoderResult.cause());
                DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(httpRequest.getProtocolVersion(), HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE);
                defaultFullHttpResponse.headers().set("Connection", (Object) "close").set("Content-Length", (Object) 0);
                channelHandlerContext.writeAndFlush(defaultFullHttpResponse).addListener2(new ChannelFutureListener() { // from class: io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.1
                    @Override // io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            return;
                        }
                        ServerRequestResponseConverter.logger.error("Failed to write response for invalid HTTP request.", channelFuture.cause());
                    }
                });
                return;
            }
            this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HEADERS_RECEIVED);
            requestState.createRxRequest(channelHandlerContext, (HttpRequest) obj);
            requestState.onProcessingStart(Clock.newStartTimeMillis());
            super.channelRead(channelHandlerContext, requestState.rxRequest);
        }
        if (!HttpContent.class.isAssignableFrom(cls)) {
            if (z) {
                return;
            }
            invokeContentOnNext(obj, requestState.contentSubject);
            return;
        }
        ByteBuf content = ((ByteBufHolder) obj).content();
        this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_CONTENT_RECEIVED);
        invokeContentOnNext(content, requestState.contentSubject);
        if (LastHttpContent.class.isAssignableFrom(cls)) {
            requestState.onRequestComplete();
            this.currentRequestState = new RequestState();
        }
    }

    @Override // io.netty.channel.ChannelDuplexHandler, io.netty.channel.ChannelOutboundHandler
    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
        Class<?> cls = obj.getClass();
        long newStartTimeMillis = Clock.newStartTimeMillis();
        if (HttpServerResponse.class.isAssignableFrom(cls)) {
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_START);
            addWriteCompleteEvents(channelPromise, newStartTimeMillis, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_FAILED);
            super.write(channelHandlerContext, ((HttpServerResponse) obj).getNettyResponse(), channelPromise);
        } else {
            if (!ByteBuf.class.isAssignableFrom(cls)) {
                super.write(channelHandlerContext, obj, channelPromise);
                return;
            }
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_START);
            addWriteCompleteEvents(channelPromise, newStartTimeMillis, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_FAILED);
            super.write(channelHandlerContext, new DefaultHttpContent((ByteBuf) obj), channelPromise);
        }
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
        channelHandlerContext.pipeline().flush();
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.currentRequestState.onConnectionClose();
        super.channelInactive(channelHandlerContext);
    }

    private void addWriteCompleteEvents(ChannelPromise channelPromise, final long j, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> httpServerMetricsEvent, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> httpServerMetricsEvent2) {
        channelPromise.addListener2((GenericFutureListener<? extends Future<? super Void>>) new ChannelFutureListener() { // from class: io.reactivex.netty.protocol.http.server.ServerRequestResponseConverter.2
            @Override // io.netty.util.concurrent.GenericFutureListener
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()) {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpServerMetricsEvent, Clock.onEndMillis(j));
                } else {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent((MetricEventsSubject) httpServerMetricsEvent2, Clock.onEndMillis(j), channelFuture.cause());
                }
            }
        });
    }

    private static void invokeContentOnNext(Object obj, UnicastContentSubject unicastContentSubject) {
        try {
            unicastContentSubject.onNext(obj);
        } catch (ClassCastException e) {
            unicastContentSubject.onError(e);
        } finally {
            ReferenceCountUtil.release(obj);
        }
    }
}
