/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal;

import io.inverno.mod.http.server.ErrorExchange;
import io.inverno.mod.http.server.Exchange;
import io.inverno.mod.http.server.ExchangeHandler;
import io.inverno.mod.http.server.internal.AbstractRequest;
import io.inverno.mod.http.server.internal.AbstractResponse;
import io.inverno.mod.http.server.internal.GenericErrorHandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.SignalType;

public abstract class AbstractExchange
extends BaseSubscriber<ByteBuf>
implements Exchange {
    protected final ChannelHandlerContext context;
    protected final EventExecutor contextExecutor;
    protected final ExchangeHandler<Exchange> rootHandler;
    protected final ExchangeHandler<ErrorExchange<Throwable>> errorHandler;
    protected final AbstractRequest request;
    protected AbstractResponse response;
    protected Handler handler;
    private int transferedLength;
    protected boolean single;
    private ByteBuf singleChunk;
    private ErrorSubscriber errorSubscriber;
    protected static final ExchangeHandler<ErrorExchange<Throwable>> LAST_RESORT_ERROR_HANDLER = new GenericErrorHandler();

    public AbstractExchange(ChannelHandlerContext context, ExchangeHandler<Exchange> rootHandler, ExchangeHandler<ErrorExchange<Throwable>> errorHandler, AbstractRequest request, AbstractResponse response) {
        this.context = context;
        this.contextExecutor = this.context.executor();
        this.rootHandler = rootHandler;
        this.errorHandler = errorHandler;
        this.request = request;
        this.response = response;
    }

    @Override
    public AbstractRequest request() {
        return this.request;
    }

    @Override
    public AbstractResponse response() {
        return this.response;
    }

    public ExchangeHandler<Exchange> getRootHandler() {
        return this.rootHandler;
    }

    public ExchangeHandler<ErrorExchange<Throwable>> getErrorHandler() {
        return this.errorHandler;
    }

    public long getTransferedLength() {
        return this.transferedLength;
    }

    public void dispose() {
        if (this.errorSubscriber != null) {
            this.errorSubscriber.dispose();
        } else {
            super.dispose();
        }
        this.request.dispose();
    }

    public boolean isDisposed() {
        return this.errorSubscriber != null ? this.errorSubscriber.isDisposed() : super.isDisposed();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Handler handler) {
        if (this.handler != null) {
            throw new IllegalStateException("Exchange already started");
        }
        this.handler = handler;
        this.handler.exchangeStart(this.context, this);
        try {
            this.rootHandler.handle(this);
        }
        catch (Throwable throwable) {
            ErrorExchange<Throwable> errorExchange = this.createErrorExchange(throwable);
            try {
                this.errorHandler.handle(errorExchange);
            }
            catch (Throwable t) {
                errorExchange = this.createErrorExchange(throwable);
                LAST_RESORT_ERROR_HANDLER.handle(errorExchange);
            }
            finally {
                this.response = (AbstractResponse)errorExchange.response();
            }
        }
        this.single = this.response.isSingle();
        this.response.data().subscribe((Subscriber)this);
    }

    protected void executeInEventLoop(Runnable runnable) {
        this.executeInEventLoop(runnable, 1);
    }

    protected void executeInEventLoop(Runnable runnable, int request) {
        if (this.contextExecutor.inEventLoop()) {
            runnable.run();
            this.request(request);
        } else {
            this.contextExecutor.execute(() -> {
                try {
                    runnable.run();
                    this.request(request);
                }
                catch (Throwable throwable) {
                    this.cancel();
                    this.hookOnError(throwable);
                }
            });
        }
    }

    protected abstract ErrorExchange<Throwable> createErrorExchange(Throwable var1);

    protected final void hookOnSubscribe(Subscription subscription) {
        this.onStart(subscription);
    }

    protected void onStart(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
    }

    protected final void hookOnNext(ByteBuf value) {
        this.transferedLength += value.readableBytes();
        if (this.single && this.singleChunk == null) {
            if (this.single && this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(this.transferedLength);
            }
            this.singleChunk = value;
        } else {
            this.executeInEventLoop(() -> this.onNextMany(value));
        }
    }

    protected abstract void onNextMany(ByteBuf var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void hookOnError(Throwable throwable) {
        if (this.response.isHeadersWritten()) {
            this.executeInEventLoop(() -> this.onCompleteWithError(throwable));
        } else {
            this.transferedLength = 0;
            ErrorExchange<Throwable> errorExchange = this.createErrorExchange(throwable);
            try {
                this.errorHandler.handle(errorExchange);
            }
            catch (Throwable t) {
                errorExchange = this.createErrorExchange(throwable);
                LAST_RESORT_ERROR_HANDLER.handle(errorExchange);
            }
            finally {
                this.response = (AbstractResponse)errorExchange.response();
            }
            this.executeInEventLoop(() -> {
                this.errorSubscriber = new ErrorSubscriber(throwable);
                this.response.data().subscribe((Subscriber)this.errorSubscriber);
            });
        }
    }

    protected abstract void onCompleteWithError(Throwable var1);

    protected final void hookOnComplete() {
        if (this.transferedLength == 0) {
            if (this.response.headers().getCharSequence("content-length") == null) {
                this.response.headers().contentLength(0L);
            }
            this.executeInEventLoop(this::onCompleteEmpty);
        } else if (this.singleChunk != null) {
            this.executeInEventLoop(() -> this.onCompleteSingle(this.singleChunk));
        } else {
            this.executeInEventLoop(this::onCompleteMany);
        }
    }

    protected abstract void onCompleteEmpty();

    protected abstract void onCompleteSingle(ByteBuf var1);

    protected abstract void onCompleteMany();

    protected void hookFinally(SignalType type) {
        this.request.dispose();
    }

    private final class ErrorSubscriber
    extends BaseSubscriber<ByteBuf> {
        private Throwable originalError;

        public ErrorSubscriber(Throwable originalError) {
            this.originalError = originalError;
        }

        protected void hookOnNext(ByteBuf value) {
            AbstractExchange.this.hookOnNext(value);
        }

        protected void hookOnError(Throwable throwable) {
            AbstractExchange.this.onCompleteWithError(this.originalError);
        }

        protected void hookOnComplete() {
            AbstractExchange.this.hookOnComplete();
        }
    }

    public static interface Handler {
        public static final Handler DEFAULT = new Handler(){};

        default public void exchangeStart(ChannelHandlerContext ctx, AbstractExchange exchange) {
        }

        default public void exchangeNext(ChannelHandlerContext ctx, ByteBuf t) {
        }

        default public void exchangeError(ChannelHandlerContext ctx, Throwable t) {
            this.exchangeComplete(ctx);
        }

        default public void exchangeComplete(ChannelHandlerContext ctx) {
        }
    }
}

