package org.asyncflows.protocol.http.server.core;

import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import org.asyncflows.core.CoreFlows;
import org.asyncflows.core.Promise;
import org.asyncflows.core.function.AResolver;
import org.asyncflows.core.function.ASupplier;
import org.asyncflows.core.function.AsyncFunctionUtil;
import org.asyncflows.core.function.FunctionExporter;
import org.asyncflows.core.util.CloseableInvalidatingBase;
import org.asyncflows.core.util.CoreFlowsAll;
import org.asyncflows.core.util.CoreFlowsResource;
import org.asyncflows.core.util.CoreFlowsSeq;
import org.asyncflows.core.util.NeedsExport;
import org.asyncflows.core.util.RequestQueue;
import org.asyncflows.io.AChannel;
import org.asyncflows.io.AInput;
import org.asyncflows.io.AOutput;
import org.asyncflows.io.util.SimpleChannel;
import org.asyncflows.protocol.http.HttpException;
import org.asyncflows.protocol.http.HttpStatusException;
import org.asyncflows.protocol.http.common.HttpMethodUtil;
import org.asyncflows.protocol.http.common.HttpRequestMessage;
import org.asyncflows.protocol.http.common.HttpResponseMessage;
import org.asyncflows.protocol.http.common.HttpScopeUtil;
import org.asyncflows.protocol.http.common.HttpStatusUtil;
import org.asyncflows.protocol.http.common.HttpURIUtil;
import org.asyncflows.protocol.http.common.HttpVersionUtil;
import org.asyncflows.protocol.http.common.Scope;
import org.asyncflows.protocol.http.common.content.ContentUtil;
import org.asyncflows.protocol.http.common.content.InputState;
import org.asyncflows.protocol.http.common.content.OutputState;
import org.asyncflows.protocol.http.common.content.StreamFinishedEvent;
import org.asyncflows.protocol.http.common.content.UnknownTransferEncodingException;
import org.asyncflows.protocol.http.common.headers.HttpHeaders;
import org.asyncflows.protocol.http.common.headers.HttpHeadersUtil;
import org.asyncflows.protocol.http.common.headers.TransferEncoding;
import org.asyncflows.protocol.http.server.AHttpHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/asyncflows/protocol/http/server/core/HttpExchangeAction.class */
public class HttpExchangeAction extends CloseableInvalidatingBase implements AHttpResponse, NeedsExport<AHttpResponse> {
    public static final Callable<StreamFinishedEvent> NOT_CREATED = () -> {
        long currentTimeMillis = System.currentTimeMillis();
        return new StreamFinishedEvent(currentTimeMillis, currentTimeMillis, 0L, null);
    };
    private static final Scope.Key<StreamFinishedEvent> CLIENT_TO_SERVER = new Scope.Key<>((Class<?>) HttpExchangeAction.class, "clientToServer", (Callable) NOT_CREATED);
    private static final Scope.Key<StreamFinishedEvent> SERVER_TO_CLIENT = new Scope.Key<>((Class<?>) HttpExchangeAction.class, "serverToClient", (Callable) NOT_CREATED);
    private static final Scope.Key<StreamFinishedEvent> CLIENT_TO_SERVER_SWITCHED = new Scope.Key<>(HttpExchangeAction.class, "clientToServerSwitched");
    private static final Scope.Key<StreamFinishedEvent> SERVER_TO_CLIENT_SWITCHED = new Scope.Key<>(HttpExchangeAction.class, "serverToClientSwitched");
    private static final Logger LOG = LoggerFactory.getLogger(HttpExchangeAction.class);
    private final HttpServerConnection connection;
    private final long exchangeId;
    private final HttpRequestMessage requestMessage = new HttpRequestMessage();
    private final HttpResponseMessage responseMessage = new HttpResponseMessage();
    private final Promise<HttpHeaders> inputTrailers = new Promise<>();
    private final RequestQueue responses = new RequestQueue();
    private boolean canContinue = true;
    private InputState inputState;
    private OutputState outputState;
    private boolean responseStarted;
    private ContentUtil.StreamInfo<AInput<ByteBuffer>> inputInfo;
    private ContentUtil.StreamInfo<AOutput<ByteBuffer>> outputInfo;
    private HttpExchangeContext exchangeContext;
    private boolean switchedProtocol;

    public HttpExchangeAction(HttpServerConnection httpServerConnection, long j) {
        this.connection = httpServerConnection;
        this.exchangeId = j;
        String host = HttpURIUtil.getHost(httpServerConnection.getLocalAddress());
        this.requestMessage.setProtocol(httpServerConnection.getProtocol());
        this.requestMessage.setServerAddress(host);
        this.responseMessage.setProtocol(httpServerConnection.getProtocol());
        this.responseMessage.setServerAddress(host);
    }

    public Promise<Boolean> handle() {
        return CoreFlowsSeq.aSeq(() -> {
            return HttpServerMessageUtil.parseRequestMessage(this.connection.getInput(), this.requestMessage);
        }).failed(th -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to parse request " + id() + ": " + this.requestMessage.getMethod() + " " + this.requestMessage.getRequestTarget() + " " + this.requestMessage.getVersion() + "\n" + this.requestMessage.getHeaders().toString(), th);
            }
            return CoreFlows.aFailure(th);
        }).map(bool -> {
            return bool.booleanValue() ? processRequest() : CoreFlows.aFalse();
        }).failed(th2 -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Failed to process request " + id(), th2);
            }
            if (this.responseStarted) {
                return CoreFlows.aFalse();
            }
            if (this.exchangeContext == null) {
                this.exchangeContext = createContext(null);
            }
            this.exchangeContext.getExchangeScope().set(HttpServer.BAD_REQUEST_PROBLEM, th2);
            this.canContinue = false;
            return this.connection.getServer().getBadRequestHandler().handle(this.exchangeContext).thenValue(false);
        }).finallyDo(CoreFlowsResource.closeResourceAction(this));
    }

    private String id() {
        return this.connection.getConnectionId() + "." + this.exchangeId;
    }

    /* JADX WARN: Type inference failed for: r13v0, types: [java.lang.Throwable, org.asyncflows.protocol.http.HttpException] */
    /* JADX WARN: Type inference failed for: r13v1, types: [java.lang.Throwable, org.asyncflows.protocol.http.common.content.UnknownTransferEncodingException] */
    private Promise<Boolean> processRequest() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Starting processing " + id() + ": " + this.requestMessage.getMethod() + " " + this.requestMessage.getRequestTarget() + " " + this.requestMessage.getVersion() + "\n" + this.requestMessage.getHeaders().toString());
        }
        this.responseMessage.setVersion(this.requestMessage.getVersion());
        this.canContinue = this.canContinue && HttpHeadersUtil.isLastExchange(this.requestMessage.getVersion(), this.requestMessage.getHeaders());
        try {
            this.inputInfo = ContentUtil.getInput(this.requestMessage.getMethod(), null, this.connection.getInput(), inputStateTracker(), this.inputTrailers.resolver(), countTo(CLIENT_TO_SERVER), TransferEncoding.parse(this.requestMessage.getHeaders().getHeaders(HttpHeadersUtil.TRANSFER_ENCODING_HEADER)), HttpHeadersUtil.getContentLength(this.requestMessage.getHeaders()));
            this.canContinue = this.canContinue && !this.inputInfo.isRestOfTheStream();
            this.requestMessage.setContentLength(this.inputInfo.getContentLength());
            AInput<ByteBuffer> stream = this.inputInfo.getStream();
            AHttpHandler handler = this.connection.getServer().getHandler();
            this.exchangeContext = createContext(stream);
            return handler.handle(this.exchangeContext).thenFlatGet(() -> {
                if (this.outputInfo == null) {
                    throw new HttpStatusException(HttpStatusUtil.INTERNAL_SERVER_ERROR, "Handler did not started reply" + id());
                }
                return close().thenFlatGet(() -> {
                    if (this.inputState != InputState.CLOSED || this.outputState != OutputState.CLOSED) {
                        this.canContinue = false;
                    }
                    return CoreFlows.aValue(Boolean.valueOf(this.canContinue));
                });
            });
        } catch (UnknownTransferEncodingException e) {
            throw new HttpStatusException(HttpStatusUtil.NOT_IMPLEMENTED, e.getMessage(), e);
        } catch (HttpException e2) {
            throw new HttpStatusException(HttpStatusUtil.BAD_REQUEST, e2.getMessage(), e2);
        }
    }

    private Consumer<StreamFinishedEvent> countTo(Scope.Key<StreamFinishedEvent> key) {
        return FunctionExporter.exportConsumer(streamFinishedEvent -> {
            if (this.exchangeContext != null) {
                this.exchangeContext.getExchangeScope().set(key, streamFinishedEvent);
            }
        });
    }

    private HttpExchangeContext createContext(AInput<ByteBuffer> aInput) {
        HttpExchangeContext httpExchangeContext = new HttpExchangeContext(this.connection.getServer().getServerScope(), this.requestMessage.getMethod(), this.requestMessage.getHeaders(), aInput, FunctionExporter.exportSupplier(AsyncFunctionUtil.promiseSupplier(this.inputTrailers)), (AHttpResponse) export(), this.requestMessage.getEffectiveUri(), this.connection.getLocalAddress(), this.connection.getRemoteAddress(), this.requestMessage.getContentLength());
        if (this.connection.getSslSession() != null) {
            httpExchangeContext.getExchangeScope().set(HttpScopeUtil.SSL_SESSION, this.connection.getSslSession());
        }
        return httpExchangeContext;
    }

    private AResolver<InputState> inputStateTracker() {
        return outcome -> {
            if (outcome.isFailure()) {
                this.inputState = InputState.ERROR;
                invalidate(outcome.failure());
            } else {
                if (this.inputState == InputState.IDLE && this.requestMessage.expectsContinue()) {
                    intermediateResponse(100, null, new HttpHeaders()).listen(outcomeChecker());
                }
                this.inputState = (InputState) outcome.value();
            }
            if (this.switchedProtocol) {
                if (this.inputState == InputState.CLOSED || this.inputState == InputState.CLOSED_BEFORE_EOF || this.inputState == InputState.ERROR) {
                    this.connection.getInput().input().close();
                }
            }
        };
    }

    @Override // org.asyncflows.protocol.http.server.core.AHttpResponse
    public Promise<Void> intermediateResponse(int i, String str, HttpHeaders httpHeaders) {
        if (!HttpStatusUtil.isInformational(i)) {
            throw new IllegalArgumentException("Only 1xx statues are allowed " + id());
        }
        if (101 == i) {
            throw new HttpException("Switch protocol status should be used only with switchProtocol(...) method " + id());
        }
        return this.responses.run(() -> {
            if (this.responseStarted) {
                return CoreFlows.aVoid();
            }
            HttpResponseMessage httpResponseMessage = new HttpResponseMessage();
            initResponseMessage(httpResponseMessage, i, str, httpHeaders);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Intermediate response started " + id() + ": " + httpResponseMessage.getVersion() + httpResponseMessage.getStatusCode() + " " + httpResponseMessage.getStatusMessage() + "\n" + httpResponseMessage.getHeaders());
            }
            return HttpServerMessageUtil.writeResponseMessage(this.connection.getOutput(), httpResponseMessage);
        });
    }

    private void initResponseMessage(HttpResponseMessage httpResponseMessage, int i, String str, HttpHeaders httpHeaders) {
        httpResponseMessage.setProtocol(this.requestMessage.getProtocol());
        String version = this.requestMessage.getVersion();
        if (!HttpVersionUtil.isHttp10(version) && !HttpVersionUtil.isHttp11(version)) {
            throw new HttpException("Unsupported response version");
        }
        httpResponseMessage.setVersion(HttpVersionUtil.HTTP_VERSION_1_1);
        httpResponseMessage.setServerAddress(this.requestMessage.getServerAddress());
        httpResponseMessage.setHeaders(httpHeaders);
        httpResponseMessage.setStatusCode(Integer.valueOf(i));
        httpResponseMessage.setStatusMessage(HttpStatusUtil.getText(httpResponseMessage.getStatusCode().intValue(), str));
        enrichReplyHeaders(httpResponseMessage);
    }

    private void enrichReplyHeaders(HttpResponseMessage httpResponseMessage) {
        HttpHeaders headers = httpResponseMessage.getHeaders();
        headers.removeHeader(HttpHeadersUtil.CONTENT_LENGTH_HEADER);
        headers.removeHeader(HttpHeadersUtil.TRANSFER_ENCODING_HEADER);
        headers.setHeader(HttpHeadersUtil.SERVER_HEADER, this.connection.getServer().getServerDescription());
        headers.setHeader(HttpHeadersUtil.DATE_HEADER, HttpHeadersUtil.formatDate(new Date()));
    }

    @Override // org.asyncflows.protocol.http.server.core.AHttpResponse
    public Promise<AOutput<ByteBuffer>> respond(int i, String str, HttpHeaders httpHeaders, Long l) {
        return this.responses.run(() -> {
            if (HttpStatusUtil.isSwitchProtocol(this.requestMessage.getMethod(), i)) {
                throw new HttpException("User switchProtocol(...) for switching protocol");
            }
            if (HttpStatusUtil.isInformational(i)) {
                throw new HttpException("Use intermediateResponse(...) for informational responses");
            }
            ensureResponseNotStarted();
            initResponseMessage(this.responseMessage, i, str, httpHeaders);
            List<TransferEncoding> transferEncodings = ContentUtil.getTransferEncodings(this.responseMessage.getVersion(), l);
            this.outputInfo = ContentUtil.getOutput(this.requestMessage.getMethod(), Integer.valueOf(i), this.connection.getOutput(), outputTracker(), trailersProvider(transferEncodings), countTo(SERVER_TO_CLIENT), transferEncodings, l);
            if (this.outputInfo.isRestOfTheStream()) {
                this.canContinue = false;
            }
            HttpHeadersUtil.setMessageBodyHeaders(httpHeaders, this.outputInfo.getEncodingList(), this.outputInfo.getContentLength());
            this.responseStarted = true;
            this.canContinue &= !((Boolean) this.exchangeContext.getExchangeScope().get(HttpScopeUtil.LAST_EXCHANGE)).booleanValue();
            HttpHeadersUtil.setLastMessageHeader(httpHeaders, this.responseMessage.getVersion(), !this.canContinue);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Response started " + id() + ": " + this.responseMessage.getVersion() + " " + this.responseMessage.getStatusCode() + " " + this.responseMessage.getStatusMessage() + "\n" + this.responseMessage.getHeaders());
            }
            return HttpServerMessageUtil.writeResponseMessage(this.connection.getOutput(), this.responseMessage).thenFlatGet(() -> {
                return CoreFlows.aValue(this.outputInfo.getStream());
            });
        });
    }

    private ASupplier<HttpHeaders> trailersProvider(List<TransferEncoding> list) {
        boolean z = false;
        Iterator<TransferEncoding> it = TransferEncoding.parse(this.requestMessage.getHeaders().getHeaders(HttpHeadersUtil.TE_HEADER)).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (HttpHeadersUtil.TE_TRAILERS_VALUE.equalsIgnoreCase(it.next().getName())) {
                z = true;
                break;
            }
        }
        if (!z) {
            return AsyncFunctionUtil.constantSupplier((Object) null);
        }
        ASupplier<HttpHeaders> trailersProvider = HttpScopeUtil.trailersProvider(this.exchangeContext.getExchangeScope(), this.responseMessage.getHeaders(), list);
        return FunctionExporter.exportSupplier(() -> {
            return trailersProvider.get().listen(outcome -> {
                if (!outcome.isFailure() && LOG.isDebugEnabled()) {
                    HttpHeaders httpHeaders = (HttpHeaders) outcome.value();
                    List<String> commaSeparatedValues = this.responseMessage.getHeaders().getCommaSeparatedValues(HttpHeadersUtil.TRAILER_HEADER);
                    LinkedHashSet linkedHashSet = new LinkedHashSet(httpHeaders.getNames());
                    Iterator<String> it2 = commaSeparatedValues.iterator();
                    while (it2.hasNext()) {
                        linkedHashSet.remove(HttpHeadersUtil.normalizeName(it2.next()));
                    }
                    if (linkedHashSet.isEmpty()) {
                        return;
                    }
                    LOG.debug("Undeclared trailers headers on exchange " + id() + ": " + linkedHashSet);
                }
            });
        });
    }

    private void ensureResponseNotStarted() {
        if (this.responseStarted) {
            throw new HttpException("The response has been already started on this request. " + id());
        }
    }

    private AResolver<OutputState> outputTracker() {
        return outcome -> {
            if (outcome.isFailure()) {
                this.outputState = OutputState.ERROR;
                invalidate(outcome.failure());
            } else {
                this.outputState = (OutputState) outcome.value();
            }
            if (this.switchedProtocol) {
                if (this.outputState == OutputState.CLOSED || this.outputState == OutputState.CLOSED_LAST || this.outputState == OutputState.ERROR) {
                    this.connection.getOutput().getOutput().close();
                }
            }
        };
    }

    @Override // org.asyncflows.protocol.http.server.core.AHttpResponse
    public Promise<AChannel<ByteBuffer>> switchProtocol(int i, String str, HttpHeaders httpHeaders) {
        return this.responses.run(() -> {
            if (101 != i && (!HttpMethodUtil.isConnect(this.requestMessage.getMethod()) || !HttpStatusUtil.isSuccess(i))) {
                throw new HttpException("Not a valid request/response for switching protocols: " + this.requestMessage.getMethod() + " -> " + i);
            }
            if (this.inputState != InputState.EOF_NO_TRAILERS && this.inputState != InputState.CLOSED && this.inputState != InputState.TRAILERS_READ) {
                throw new HttpException("Read all input before switching protocols : " + this.inputState);
            }
            ensureResponseNotStarted();
            this.responseStarted = true;
            this.canContinue = false;
            initResponseMessage(this.responseMessage, i, str, httpHeaders);
            return CoreFlowsSeq.aSeq(() -> {
                AInput<ByteBuffer> stream = this.inputInfo.getStream();
                boolean z = this.inputState == InputState.CLOSED;
                this.inputInfo = null;
                this.inputState = null;
                return z ? CoreFlows.aVoid() : stream.close();
            }).thenDo(() -> {
                if (((Boolean) this.exchangeContext.getExchangeScope().get(HttpServer.SWITCH_NO_REPLY)).booleanValue()) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Switching protocols " + id() + " without reply message");
                    }
                    return CoreFlows.aVoid();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Switching protocols " + id() + ": " + this.responseMessage.getVersion() + " " + this.responseMessage.getStatusCode() + " " + this.responseMessage.getStatusMessage() + "\n" + this.responseMessage.getHeaders());
                }
                return HttpServerMessageUtil.writeResponseMessage(this.connection.getOutput(), this.responseMessage);
            }).thenDoLast(() -> {
                this.inputInfo = ContentUtil.getInput(HttpMethodUtil.GET, Integer.valueOf(HttpStatusUtil.OK), this.connection.getInput(), inputStateTracker(), null, countTo(CLIENT_TO_SERVER_SWITCHED), Collections.emptyList(), null);
                this.outputInfo = ContentUtil.getOutput(HttpMethodUtil.GET, Integer.valueOf(HttpStatusUtil.OK), this.connection.getOutput(), outputTracker(), AsyncFunctionUtil.constantSupplier((HttpHeaders) null), countTo(SERVER_TO_CLIENT_SWITCHED), Collections.emptyList(), null);
                this.switchedProtocol = true;
                return CoreFlows.aValue(new SimpleChannel(this.inputInfo.getStream(), this.outputInfo.getStream()).export());
            });
        });
    }

    public Promise<Void> closeAction() {
        return CoreFlowsSeq.aSeq(() -> {
            return CoreFlowsAll.aAll(() -> {
                return this.inputInfo != null ? this.inputInfo.getStream().close() : CoreFlows.aVoid();
            }).andLast(() -> {
                if (this.outputInfo != null) {
                    return this.outputInfo.getStream().close();
                }
                if (this.requestMessage.getMethod() != null) {
                    throw new HttpStatusException(HttpStatusUtil.INTERNAL_SERVER_ERROR, "Handler did not started reply");
                }
                return CoreFlows.aVoid();
            }).toVoid();
        }).thenDo(() -> {
            if (this.inputState != InputState.CLOSED || this.outputState != OutputState.CLOSED) {
                this.canContinue = false;
            }
            return CoreFlows.aVoid();
        }).finallyDo(() -> {
            if (this.exchangeContext != null) {
                this.connection.getServer().fireExchangeFinished(new ExchangeFinishedEvent(id(), this.exchangeContext.getRemoteAddress(), this.exchangeContext.getLocalAddress(), this.requestMessage.getMethod(), this.requestMessage.getEffectiveUri(), this.requestMessage.getVersion(), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(CLIENT_TO_SERVER), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(CLIENT_TO_SERVER_SWITCHED), this.responseMessage.getStatusCode(), this.responseMessage.getStatusMessage(), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(SERVER_TO_CLIENT), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(SERVER_TO_CLIENT_SWITCHED), (String) this.exchangeContext.getExchangeScope().get(ExchangeFinishedEvent.REMOTE), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(ExchangeFinishedEvent.SERVER_TO_REMOTE), (StreamFinishedEvent) this.exchangeContext.getExchangeScope().get(ExchangeFinishedEvent.REMOTE_TO_SERVER)));
            }
            return CoreFlows.aVoid();
        });
    }
}
