/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal;

import io.inverno.mod.http.base.ExchangeContext;
import io.inverno.mod.http.base.HttpException;
import io.inverno.mod.http.base.Method;
import io.inverno.mod.http.server.ErrorExchange;
import io.inverno.mod.http.server.Exchange;
import io.inverno.mod.http.server.ServerController;
import io.inverno.mod.http.server.internal.AbstractRequest;
import io.inverno.mod.http.server.internal.AbstractResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
import org.apache.logging.log4j.message.MultiformatMessage;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public abstract class AbstractExchange
extends BaseSubscriber<ByteBuf>
implements Exchange<ExchangeContext> {
    private static final Logger LOGGER = LogManager.getLogger(Exchange.class);
    private static final Marker MARKER_ERROR = MarkerManager.getMarker((String)"HTTP_ERROR");
    private static final Marker MARKER_ACCESS = MarkerManager.getMarker((String)"HTTP_ACCESS");
    protected final ChannelHandlerContext context;
    protected final EventExecutor contextExecutor;
    protected final ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> controller;
    protected final AbstractRequest request;
    protected AbstractResponse response;
    protected final ExchangeContext exchangeContext;
    protected Mono<Void> finalizer;
    protected Handler handler;
    protected int transferedLength;
    protected boolean single;
    protected boolean many;
    private ByteBuf singleChunk;
    protected Disposable disposable;
    protected static final ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> LAST_RESORT_ERROR_CONTROLLER = exchange -> {};

    public AbstractExchange(ChannelHandlerContext context, ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> controller, AbstractRequest request, AbstractResponse response) {
        this.context = context;
        this.contextExecutor = this.context.executor();
        this.controller = controller;
        this.request = request;
        this.response = response;
        this.exchangeContext = controller.createContext();
        if (this.exchangeContext != null) {
            this.exchangeContext.init();
        }
    }

    @Override
    public AbstractRequest request() {
        return this.request;
    }

    @Override
    public AbstractResponse response() {
        return this.response;
    }

    public ExchangeContext context() {
        return this.exchangeContext;
    }

    @Override
    public void finalizer(Mono<Void> finalizer) {
        this.finalizer = this.finalizer != null ? this.finalizer.then(finalizer) : finalizer;
    }

    public ChannelFuture finalizeExchange(ChannelPromise finalPromise, Runnable postFinalize) {
        if (this.finalizer != null) {
            finalPromise.addListener(future -> {
                Mono<Void> actualFinalizer = this.finalizer;
                if (postFinalize != null) {
                    actualFinalizer.doOnTerminate(postFinalize);
                }
                actualFinalizer.doOnSuccess(ign -> LOGGER.trace(() -> "Exchange finalized")).subscribe();
            });
        } else if (postFinalize != null) {
            postFinalize.run();
        }
        return finalPromise;
    }

    public ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> getController() {
        return this.controller;
    }

    public long getTransferedLength() {
        return this.transferedLength;
    }

    public void dispose() {
        if (this.disposable == this) {
            super.dispose();
        } else if (this.disposable != null) {
            this.disposable.dispose();
        }
        this.request.dispose();
    }

    public boolean isDisposed() {
        return this.disposable != null ? this.disposable.isDisposed() : true;
    }

    public void start(Handler handler) {
        Mono<Void> deferHandle;
        if (this.handler != null) {
            throw new IllegalStateException("Exchange already started");
        }
        this.handler = handler;
        this.handler.exchangeStart(this.context, this);
        try {
            deferHandle = this.controller.defer((ErrorExchange<ExchangeContext>)((Object)this));
        }
        catch (Throwable throwable) {
            this.logError("Exchange handler error", throwable);
            ErrorExchange<ExchangeContext> errorExchange = this.createErrorExchange(throwable);
            this.response = (AbstractResponse)errorExchange.response();
            try {
                deferHandle = this.controller.defer(errorExchange);
            }
            catch (Throwable t) {
                this.logError("ErrorExchange handler error", t);
                errorExchange = this.createErrorExchange(throwable);
                this.response = (AbstractResponse)errorExchange.response();
                deferHandle = LAST_RESORT_ERROR_CONTROLLER.defer(errorExchange);
            }
        }
        ServerControllerSubscriber subscriber = this.createServerControllerSubscriber();
        this.disposable = subscriber;
        deferHandle.subscribe((CoreSubscriber)subscriber);
    }

    protected ServerControllerSubscriber createServerControllerSubscriber() {
        return new ServerControllerSubscriber();
    }

    protected void executeInEventLoop(Runnable runnable) {
        this.executeInEventLoop(runnable, 1);
    }

    protected void executeInEventLoop(Runnable runnable, int request) {
        if (this.contextExecutor.inEventLoop()) {
            runnable.run();
            this.request(request);
        } else {
            this.contextExecutor.execute(() -> {
                try {
                    runnable.run();
                    this.request(request);
                }
                catch (Throwable throwable) {
                    this.cancel();
                    this.hookOnError(throwable);
                }
            });
        }
    }

    protected abstract ErrorExchange<ExchangeContext> createErrorExchange(Throwable var1);

    protected final void hookOnSubscribe(Subscription subscription) {
        this.onStart(subscription);
    }

    protected void onStart(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        LOGGER.debug(() -> "Exchange started");
    }

    protected final void hookOnNext(ByteBuf value) {
        this.transferedLength += value.readableBytes();
        if ((this.single || !this.many) && this.singleChunk == null) {
            this.singleChunk = value;
        } else {
            this.many = true;
            ByteBuf firstValue = this.singleChunk;
            this.singleChunk = null;
            this.executeInEventLoop(() -> {
                if (firstValue != null) {
                    this.onNextMany(firstValue);
                }
                this.onNextMany(value);
            });
        }
    }

    protected abstract void onNextMany(ByteBuf var1);

    protected final void hookOnError(Throwable throwable) {
        if (this.response.isHeadersWritten()) {
            this.executeInEventLoop(() -> {
                this.onCompleteWithError(throwable);
                this.logError("Exchange processing error", throwable);
            });
        } else {
            this.transferedLength = 0;
            ErrorExchange<ExchangeContext> errorExchange = this.createErrorExchange(throwable);
            this.response = (AbstractResponse)errorExchange.response();
            try {
                Mono<Void> deferHandle = this.controller.defer(errorExchange);
                this.executeInEventLoop(() -> {
                    ErrorHandlerSubscriber subscriber = new ErrorHandlerSubscriber(throwable);
                    this.disposable = subscriber;
                    deferHandle.subscribe((CoreSubscriber)subscriber);
                });
            }
            catch (Throwable t) {
                this.logError("ErrorExchange handler error", t);
                errorExchange = this.createErrorExchange(throwable);
                this.response = (AbstractResponse)errorExchange.response();
                Mono<Void> deferHandle = LAST_RESORT_ERROR_CONTROLLER.defer(errorExchange);
                this.executeInEventLoop(() -> {
                    ErrorHandlerSubscriber subscriber = new ErrorHandlerSubscriber(throwable);
                    this.disposable = subscriber;
                    deferHandle.subscribe((CoreSubscriber)subscriber);
                });
            }
        }
    }

    protected abstract void onCompleteWithError(Throwable var1);

    protected final void hookOnComplete() {
        if (this.transferedLength == 0) {
            if (this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(0L);
            }
            this.executeInEventLoop(() -> {
                this.onCompleteEmpty();
                this.logAccess();
            });
        } else if (this.singleChunk != null) {
            if (this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(this.transferedLength);
            }
            if (this.request.getMethod().equals((Object)Method.HEAD)) {
                this.executeInEventLoop(() -> {
                    this.onCompleteEmpty();
                    this.logAccess();
                });
            } else {
                this.executeInEventLoop(() -> {
                    this.onCompleteSingle(this.singleChunk);
                    this.logAccess();
                });
            }
        } else {
            this.executeInEventLoop(() -> {
                this.onCompleteMany();
                this.logAccess();
            });
        }
        LOGGER.debug(() -> "Exchange completed");
    }

    protected void logAccess() {
        LOGGER.info(MARKER_ACCESS, () -> new AccessLogMessage());
    }

    private void logError(String message, Throwable throwable) {
        if (throwable instanceof HttpException) {
            LOGGER.error(MARKER_ERROR, message, throwable);
        } else {
            LOGGER.error(message, throwable);
        }
    }

    protected abstract void onCompleteEmpty();

    protected abstract void onCompleteSingle(ByteBuf var1);

    protected abstract void onCompleteMany();

    protected void hookFinally(SignalType type) {
        this.request.dispose();
    }

    public static interface Handler {
        public static final Handler DEFAULT = new Handler(){};

        default public void exchangeStart(ChannelHandlerContext ctx, AbstractExchange exchange) {
        }

        default public void exchangeNext(ChannelHandlerContext ctx, ByteBuf t) {
        }

        default public void exchangeError(ChannelHandlerContext ctx, Throwable t) {
            this.exchangeComplete(ctx);
        }

        default public void exchangeComplete(ChannelHandlerContext ctx) {
        }
    }

    protected class ServerControllerSubscriber
    extends BaseSubscriber<Void> {
        protected ServerControllerSubscriber() {
        }

        protected void hookOnError(Throwable t) {
            AbstractExchange.this.hookOnError(t);
        }

        protected void hookOnComplete() {
            if (AbstractExchange.this.request.getMethod().equals((Object)Method.HEAD)) {
                AbstractExchange.this.executeInEventLoop(AbstractExchange.this::onCompleteEmpty);
                AbstractExchange.this.dispose();
            } else {
                AbstractExchange.this.single = AbstractExchange.this.response.isSingle();
                AbstractExchange.this.disposable = AbstractExchange.this;
                AbstractExchange.this.response.dataSubscribe((Subscriber<ByteBuf>)AbstractExchange.this);
            }
        }
    }

    private class AccessLogMessage
    implements MultiformatMessage {
        private static final long serialVersionUID = -8367544116216876788L;
        private static final String JSON_FORMAT = "JSON";

        private AccessLogMessage() {
        }

        public String getFormat() {
            return "";
        }

        public Object[] getParameters() {
            return new Object[]{this.getRemoteAddress(), this.getRequest(), this.getStatus(), this.getTransferedBytes(), this.getReferer(), this.getUserAgent()};
        }

        public Throwable getThrowable() {
            return null;
        }

        public String getFormattedMessage() {
            return this.asString();
        }

        public String getFormattedMessage(String[] formats) {
            for (String format : formats) {
                if (!format.equalsIgnoreCase(JSON_FORMAT)) continue;
                return this.asJson();
            }
            return this.asString();
        }

        public String[] getFormats() {
            return new String[]{JSON_FORMAT};
        }

        private String getRemoteAddress() {
            return ((InetSocketAddress)AbstractExchange.this.request.getRemoteAddress()).getAddress().getHostAddress();
        }

        private String getRequest() {
            return AbstractExchange.this.request.getMethod().name() + " " + AbstractExchange.this.request.getPath();
        }

        private int getStatus() {
            return AbstractExchange.this.response.headers().getStatusCode();
        }

        private int getTransferedBytes() {
            return AbstractExchange.this.transferedLength;
        }

        private String getReferer() {
            return AbstractExchange.this.request.headers().get((CharSequence)"referer").orElse("");
        }

        private String getUserAgent() {
            return AbstractExchange.this.request.headers().get((CharSequence)"user-agent").orElse("");
        }

        private String asString() {
            StringBuilder message = new StringBuilder();
            message.append(((InetSocketAddress)AbstractExchange.this.request.getRemoteAddress()).getAddress().getHostName()).append(" ");
            message.append("\"").append(AbstractExchange.this.request.getMethod().name()).append(" ").append(AbstractExchange.this.request.getPath()).append("\" ");
            message.append(AbstractExchange.this.response.headers().getStatusCode()).append(" ");
            message.append(AbstractExchange.this.transferedLength).append(" ");
            message.append("\"").append(AbstractExchange.this.request.headers().get((CharSequence)"referer").orElse("")).append("\" ");
            message.append("\"").append(AbstractExchange.this.request.headers().get((CharSequence)"user-agent").orElse("")).append("\" ");
            return message.toString();
        }

        private String asJson() {
            StringBuilder message = new StringBuilder();
            message.append("{");
            message.append("\"remoteAddress\":\"").append(this.getRemoteAddress()).append("\",");
            message.append("\"request\":\"").append(StringEscapeUtils.escapeJson((String)this.getRequest())).append("\",");
            message.append("\"status\":").append(this.getStatus()).append(",");
            message.append("\"bytes\":").append(this.getTransferedBytes()).append(",");
            message.append("\"referer\":\"").append(StringEscapeUtils.escapeJson((String)this.getReferer())).append("\",");
            message.append("\"userAgent\":\"").append(StringEscapeUtils.escapeJson((String)this.getUserAgent())).append("\"");
            message.append("}");
            return message.toString();
        }
    }

    private final class ErrorHandlerSubscriber
    extends BaseSubscriber<Void> {
        private final Throwable originalError;

        public ErrorHandlerSubscriber(Throwable originalError) {
            this.originalError = originalError;
        }

        protected void hookOnError(Throwable t) {
            AbstractExchange.this.hookOnError(t);
        }

        protected void hookOnComplete() {
            AbstractExchange.this.single = AbstractExchange.this.response.isSingle();
            ErrorSubscriber subscriber = new ErrorSubscriber(this.originalError);
            AbstractExchange.this.disposable = subscriber;
            AbstractExchange.this.response.dataSubscribe((Subscriber<ByteBuf>)subscriber);
        }
    }

    private final class ErrorSubscriber
    extends BaseSubscriber<ByteBuf> {
        private final Throwable originalError;

        public ErrorSubscriber(Throwable originalError) {
            this.originalError = originalError;
        }

        protected void hookOnNext(ByteBuf value) {
            AbstractExchange.this.hookOnNext(value);
        }

        protected void hookOnError(Throwable throwable) {
            AbstractExchange.this.onCompleteWithError(this.originalError);
            AbstractExchange.this.logError("Exchange processing error", this.originalError);
            AbstractExchange.this.logError("ErrorExchange processing error", throwable);
        }

        protected void hookOnComplete() {
            AbstractExchange.this.hookOnComplete();
            AbstractExchange.this.logError("Exchange processing error", this.originalError);
            AbstractExchange.this.logAccess();
        }
    }
}

