/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal;

import io.inverno.mod.http.base.ExchangeContext;
import io.inverno.mod.http.base.HttpException;
import io.inverno.mod.http.base.Method;
import io.inverno.mod.http.server.ErrorExchange;
import io.inverno.mod.http.server.Exchange;
import io.inverno.mod.http.server.ServerController;
import io.inverno.mod.http.server.internal.AbstractRequest;
import io.inverno.mod.http.server.internal.AbstractResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
import org.apache.logging.log4j.message.MultiformatMessage;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public abstract class AbstractExchange
extends BaseSubscriber<ByteBuf>
implements Exchange<ExchangeContext> {
    private static final Logger LOGGER = LogManager.getLogger(Exchange.class);
    private static final Marker MARKER_ERROR = MarkerManager.getMarker((String)"HTTP_ERROR");
    private static final Marker MARKER_ACCESS = MarkerManager.getMarker((String)"HTTP_ACCESS");
    protected static final ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> LAST_RESORT_ERROR_CONTROLLER = exchange -> {};
    protected final ChannelHandlerContext context;
    protected final EventExecutor contextExecutor;
    protected final ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> controller;
    protected final ExchangeContext exchangeContext;
    protected final AbstractRequest request;
    private ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> currentController;
    protected AbstractResponse response;
    protected Mono<Void> finalizer;
    private boolean finalizing;
    private boolean finalized;
    protected Handler handler;
    protected int transferedLength;
    private boolean reset;
    protected boolean single;
    protected boolean many;
    private ByteBuf singleChunk;
    private Throwable disposeError;
    protected BaseSubscriber<?> subscriber;

    public AbstractExchange(ChannelHandlerContext context, ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> controller, AbstractRequest request, AbstractResponse response) {
        this.context = context;
        this.contextExecutor = this.context.executor();
        this.controller = controller;
        this.currentController = this.controller;
        this.request = request;
        this.response = response;
        this.exchangeContext = controller.createContext();
        if (this.exchangeContext != null) {
            this.exchangeContext.init();
        }
    }

    public AbstractRequest request() {
        return this.request;
    }

    public AbstractResponse response() {
        return this.response;
    }

    public ExchangeContext context() {
        return this.exchangeContext;
    }

    @Override
    public void finalizer(Mono<Void> finalizer) {
        this.finalizer = this.finalizer != null ? this.finalizer.then(finalizer) : finalizer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ChannelFuture finalizeExchange(ChannelPromise finalPromise, Runnable postFinalize) {
        if (this.finalizing || this.finalized) {
            return this.context.newFailedFuture((Throwable)new IllegalStateException("Exchange already finalized"));
        }
        this.finalizing = true;
        ChannelPromise promise = this.context.newPromise();
        if (this.finalizer != null) {
            finalPromise.addListener(future -> {
                Mono actualFinalizer = this.finalizer;
                if (postFinalize != null) {
                    actualFinalizer = Flux.concatDelayError((Publisher[])new Publisher[]{actualFinalizer, Mono.fromRunnable((Runnable)postFinalize)}).then();
                }
                actualFinalizer.doOnError(arg_0 -> ((ChannelPromise)promise).tryFailure(arg_0)).doOnSuccess(ign -> promise.trySuccess()).doOnCancel(() -> promise.tryFailure((Throwable)new IllegalStateException("Finalizer was cancel"))).doFinally(ign -> {
                    this.finalized = true;
                    this.finalizing = false;
                }).subscribe();
            });
        } else if (postFinalize != null) {
            try {
                postFinalize.run();
                promise.trySuccess();
            }
            catch (Throwable t) {
                promise.tryFailure(t);
            }
            finally {
                this.finalized = true;
                this.finalizing = false;
            }
        }
        return promise;
    }

    public ServerController<ExchangeContext, Exchange<ExchangeContext>, ErrorExchange<ExchangeContext>> getController() {
        return this.controller;
    }

    public long getTransferedLength() {
        return this.transferedLength;
    }

    public void dispose() {
        this.dispose(null);
    }

    public void dispose(Throwable error) {
        this.disposeError = error;
        this.request.dispose(error);
        if (this.subscriber == this) {
            super.dispose();
        } else if (this.subscriber != null) {
            this.subscriber.dispose();
        }
    }

    public boolean isDisposed() {
        if (this.handler == null) {
            return false;
        }
        if (this.subscriber != null) {
            if (this.subscriber == this) {
                return super.isDisposed();
            }
            return this.subscriber.isDisposed();
        }
        return true;
    }

    public void start(Handler handler) {
        if (this.handler != null) {
            throw new IllegalStateException("Exchange already started");
        }
        this.handler = handler;
        this.handler.exchangeStart(this.context, this);
        try {
            ServerControllerSubscriber exchangeSubscriber;
            Mono<Void> deferHandle = this.currentController.defer((ErrorExchange<ExchangeContext>)((Object)this));
            this.subscriber = exchangeSubscriber = this.createServerControllerSubscriber();
            deferHandle.subscribe((CoreSubscriber)exchangeSubscriber);
        }
        catch (Throwable exchangeError) {
            this.hookOnError(exchangeError);
        }
    }

    protected ServerControllerSubscriber createServerControllerSubscriber() {
        return new ServerControllerSubscriber();
    }

    protected Future<?> executeInEventLoop(Runnable runnable) {
        if (this.contextExecutor.inEventLoop()) {
            try {
                runnable.run();
                return this.contextExecutor.newSucceededFuture(null);
            }
            catch (Throwable e) {
                return this.contextExecutor.newFailedFuture(e);
            }
        }
        return this.contextExecutor.submit(runnable);
    }

    protected <T> Future<T> executeInEventLoop(Callable<T> callable) {
        if (this.contextExecutor.inEventLoop()) {
            try {
                return this.contextExecutor.newSucceededFuture(callable.call());
            }
            catch (Throwable t) {
                return this.contextExecutor.newFailedFuture(t);
            }
        }
        return this.contextExecutor.submit(callable);
    }

    protected abstract ErrorExchange<ExchangeContext> createErrorExchange(Throwable var1);

    protected final void hookOnSubscribe(Subscription subscription) {
        this.onStart(subscription);
    }

    protected void onStart(Subscription subscription) {
        subscription.request(1L);
    }

    protected final void hookOnNext(ByteBuf value) {
        this.transferedLength += value.readableBytes();
        if ((this.single || !this.many) && this.singleChunk == null) {
            this.singleChunk = value;
            this.subscriber.request(1L);
        } else {
            this.many = true;
            ByteBuf firstValue = this.singleChunk;
            this.singleChunk = null;
            this.executeInEventLoop(() -> {
                ChannelPromise nextPromise = this.context.newPromise().addListener(future -> {
                    if (future.isSuccess()) {
                        this.subscriber.request(1L);
                    } else if (!this.subscriber.isDisposed()) {
                        this.subscriber.cancel();
                        this.hookOnError(future.cause());
                    }
                });
                if (firstValue != null) {
                    this.onNextMany(firstValue, this.context.voidPromise());
                }
                this.onNextMany(value, nextPromise);
            }).addListener(future -> {
                if (!future.isSuccess() && !this.subscriber.isDisposed()) {
                    this.subscriber.cancel();
                    this.hookOnError(future.cause());
                }
            });
        }
    }

    protected abstract void onNextMany(ByteBuf var1, ChannelPromise var2);

    protected final void hookOnError(Throwable exchangeError) {
        if (this.response.isHeadersWritten()) {
            this.executeInEventLoop(() -> {
                this.onCompleteWithError(exchangeError);
                this.logError("Fatal exchange processing error", exchangeError);
            });
        } else {
            this.transferedLength = 0;
            ErrorExchange<ExchangeContext> errorExchange = this.createErrorExchange(exchangeError);
            this.response = (AbstractResponse)errorExchange.response();
            try {
                Mono<Void> deferHandle = this.currentController.defer(errorExchange);
                this.executeInEventLoop(() -> {
                    ErrorHandlerSubscriber errorSubscriber;
                    this.subscriber = errorSubscriber = new ErrorHandlerSubscriber(exchangeError);
                    deferHandle.subscribe((CoreSubscriber)errorSubscriber);
                });
            }
            catch (Throwable errorHandlerError) {
                this.logError("Error handler error", errorHandlerError);
                if (this.currentController != LAST_RESORT_ERROR_CONTROLLER) {
                    this.currentController = LAST_RESORT_ERROR_CONTROLLER;
                    this.hookOnError(exchangeError);
                }
                this.executeInEventLoop(() -> {
                    this.onCompleteWithError(exchangeError);
                    this.logError("Fatal exchange processing error", exchangeError);
                });
            }
        }
    }

    protected abstract void onCompleteWithError(Throwable var1);

    protected final void hookOnComplete() {
        if (this.transferedLength == 0) {
            if (this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(0L);
            }
            this.executeInEventLoop(() -> {
                this.onCompleteEmpty();
                this.logAccess();
            });
        } else if (this.singleChunk != null) {
            if (this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(this.transferedLength);
            }
            if (this.request.getMethod().equals((Object)Method.HEAD)) {
                this.executeInEventLoop(() -> {
                    this.onCompleteEmpty();
                    this.logAccess();
                });
            } else {
                this.executeInEventLoop(() -> {
                    this.onCompleteSingle(this.singleChunk);
                    this.logAccess();
                });
            }
        } else {
            this.executeInEventLoop(() -> {
                this.onCompleteMany();
                this.logAccess();
            });
        }
    }

    protected abstract void onCompleteEmpty();

    protected abstract void onCompleteSingle(ByteBuf var1);

    protected abstract void onCompleteMany();

    public void reset(long code) {
        this.executeInEventLoop(() -> {
            if (!this.reset) {
                this.reset = true;
                if (this.singleChunk != null) {
                    this.singleChunk.release();
                    this.singleChunk = null;
                }
                this.onReset(code);
                this.logAccess();
            }
        });
    }

    protected abstract void onReset(long var1);

    public Optional<Throwable> getCancelCause() {
        return Optional.ofNullable(this.disposeError);
    }

    protected void logAccess() {
        LOGGER.info(MARKER_ACCESS, () -> new AccessLogMessage());
    }

    private void logError(String message, Throwable throwable) {
        if (throwable instanceof HttpException) {
            LOGGER.error(MARKER_ERROR, message, throwable);
        } else {
            LOGGER.error(message, throwable);
        }
    }

    protected void hookFinally(SignalType type) {
        this.request.dispose();
    }

    public static interface Handler {
        public static final Handler DEFAULT = new Handler(){};

        default public void exchangeStart(ChannelHandlerContext ctx, AbstractExchange exchange) {
        }

        default public void exchangeNext(ChannelHandlerContext ctx, ByteBuf t) {
        }

        default public void exchangeError(ChannelHandlerContext ctx, Throwable t) {
            this.exchangeComplete(ctx);
        }

        default public void exchangeComplete(ChannelHandlerContext ctx) {
        }

        default public void exchangeReset(ChannelHandlerContext ctx, long code) {
        }
    }

    protected class ServerControllerSubscriber
    extends BaseSubscriber<Void> {
        protected ServerControllerSubscriber() {
        }

        protected void hookOnError(Throwable t) {
            if (!AbstractExchange.this.reset) {
                AbstractExchange.this.hookOnError(t);
            }
        }

        protected void hookOnComplete() {
            if (!AbstractExchange.this.reset) {
                if (AbstractExchange.this.request.getMethod().equals((Object)Method.HEAD)) {
                    AbstractExchange.this.executeInEventLoop(AbstractExchange.this::onCompleteEmpty);
                } else {
                    AbstractExchange.this.single = AbstractExchange.this.response.isSingle();
                    AbstractExchange.this.subscriber = AbstractExchange.this;
                    AbstractExchange.this.response.dataSubscribe((Subscriber<ByteBuf>)AbstractExchange.this);
                }
            }
        }
    }

    private class AccessLogMessage
    implements MultiformatMessage {
        private static final long serialVersionUID = -8367544116216876788L;
        private static final String JSON_FORMAT = "JSON";

        private AccessLogMessage() {
        }

        public String getFormat() {
            return "";
        }

        public Object[] getParameters() {
            return new Object[]{this.getRemoteAddress(), this.getRequest(), this.getStatus(), this.getTransferedBytes(), this.getReferer(), this.getUserAgent()};
        }

        public Throwable getThrowable() {
            return null;
        }

        public String getFormattedMessage() {
            return this.asString();
        }

        public String getFormattedMessage(String[] formats) {
            for (String format : formats) {
                if (!format.equalsIgnoreCase(JSON_FORMAT)) continue;
                return this.asJson();
            }
            return this.asString();
        }

        public String[] getFormats() {
            return new String[]{JSON_FORMAT};
        }

        private String getRemoteAddress() {
            return ((InetSocketAddress)AbstractExchange.this.request.getRemoteAddress()).getAddress().getHostAddress();
        }

        private String getRequest() {
            return AbstractExchange.this.request.getMethod().name() + " " + AbstractExchange.this.request.getPath();
        }

        private int getStatus() {
            return AbstractExchange.this.response.headers().getStatusCode();
        }

        private int getTransferedBytes() {
            return AbstractExchange.this.transferedLength;
        }

        private String getReferer() {
            return AbstractExchange.this.request.headers().get((CharSequence)"referer").orElse("");
        }

        private String getUserAgent() {
            return AbstractExchange.this.request.headers().get((CharSequence)"user-agent").orElse("");
        }

        private String asString() {
            StringBuilder message = new StringBuilder();
            message.append(((InetSocketAddress)AbstractExchange.this.request.getRemoteAddress()).getAddress().getHostName()).append(" ");
            message.append("\"").append(AbstractExchange.this.request.getMethod().name()).append(" ").append(AbstractExchange.this.request.getPath()).append("\" ");
            message.append(AbstractExchange.this.response.headers().getStatusCode()).append(" ");
            message.append(AbstractExchange.this.transferedLength).append(" ");
            message.append("\"").append(AbstractExchange.this.request.headers().get((CharSequence)"referer").orElse("")).append("\" ");
            message.append("\"").append(AbstractExchange.this.request.headers().get((CharSequence)"user-agent").orElse("")).append("\" ");
            return message.toString();
        }

        private String asJson() {
            StringBuilder message = new StringBuilder();
            message.append("{");
            message.append("\"remoteAddress\":\"").append(this.getRemoteAddress()).append("\",");
            message.append("\"request\":\"").append(StringEscapeUtils.escapeJson((String)this.getRequest())).append("\",");
            message.append("\"status\":").append(this.getStatus()).append(",");
            message.append("\"bytes\":").append(this.getTransferedBytes()).append(",");
            message.append("\"referer\":\"").append(StringEscapeUtils.escapeJson((String)this.getReferer())).append("\",");
            message.append("\"userAgent\":\"").append(StringEscapeUtils.escapeJson((String)this.getUserAgent())).append("\"");
            message.append("}");
            return message.toString();
        }
    }

    private final class ErrorHandlerSubscriber
    extends BaseSubscriber<Void> {
        private final Throwable exchangeError;

        public ErrorHandlerSubscriber(Throwable exchangeError) {
            this.exchangeError = exchangeError;
        }

        protected void hookOnError(Throwable errorHandlerError) {
            AbstractExchange.this.logError("Error handler error", errorHandlerError);
            if (AbstractExchange.this.currentController != LAST_RESORT_ERROR_CONTROLLER) {
                AbstractExchange.this.currentController = LAST_RESORT_ERROR_CONTROLLER;
                AbstractExchange.this.hookOnError(this.exchangeError);
            } else {
                AbstractExchange.this.executeInEventLoop(() -> {
                    AbstractExchange.this.onCompleteWithError(this.exchangeError);
                    AbstractExchange.this.logError("Fatal exchange processing error", this.exchangeError);
                });
            }
        }

        protected void hookOnComplete() {
            if (!AbstractExchange.this.reset) {
                ErrorSubscriber subscriber;
                AbstractExchange.this.single = AbstractExchange.this.response.isSingle();
                AbstractExchange.this.subscriber = subscriber = new ErrorSubscriber(this.exchangeError);
                AbstractExchange.this.response.dataSubscribe((Subscriber<ByteBuf>)subscriber);
            }
        }
    }

    private final class ErrorSubscriber
    extends BaseSubscriber<ByteBuf> {
        private final Throwable exchangeError;

        public ErrorSubscriber(Throwable exchangeError) {
            this.exchangeError = exchangeError;
        }

        protected void hookOnNext(ByteBuf value) {
            AbstractExchange.this.hookOnNext(value);
        }

        protected void hookOnError(Throwable errorHandlerError) {
            AbstractExchange.this.logError("Error handler error", errorHandlerError);
            if (AbstractExchange.this.currentController != LAST_RESORT_ERROR_CONTROLLER) {
                AbstractExchange.this.currentController = LAST_RESORT_ERROR_CONTROLLER;
                AbstractExchange.this.hookOnError(this.exchangeError);
            } else {
                AbstractExchange.this.executeInEventLoop(() -> {
                    AbstractExchange.this.onCompleteWithError(this.exchangeError);
                    AbstractExchange.this.logError("Fatal exchange processing error", this.exchangeError);
                });
            }
        }

        protected void hookOnComplete() {
            AbstractExchange.this.hookOnComplete();
            AbstractExchange.this.logError("Exchange processing error", this.exchangeError);
        }
    }
}

