/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal.http1x.ws;

import io.inverno.mod.http.base.ws.WebSocketStatus;
import io.inverno.mod.http.server.Exchange;
import io.inverno.mod.http.server.ExchangeContext;
import io.inverno.mod.http.server.Request;
import io.inverno.mod.http.server.internal.http1x.ws.GenericWebSocketFrame;
import io.inverno.mod.http.server.internal.http1x.ws.GenericWebSocketMessage;
import io.inverno.mod.http.server.ws.WebSocketException;
import io.inverno.mod.http.server.ws.WebSocketExchange;
import io.inverno.mod.http.server.ws.WebSocketExchangeHandler;
import io.inverno.mod.http.server.ws.WebSocketFrame;
import io.inverno.mod.http.server.ws.WebSocketMessage;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.GenericFutureListener;
import java.util.Optional;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;

public class GenericWebSocketExchange
extends BaseSubscriber<WebSocketFrame>
implements WebSocketExchange<ExchangeContext> {
    private static final Logger LOGGER = LogManager.getLogger(WebSocketExchange.class);
    private final ChannelHandlerContext context;
    private final Exchange<ExchangeContext> exchange;
    private final String subProtocol;
    private final WebSocketExchangeHandler<ExchangeContext, WebSocketExchange<ExchangeContext>> handler;
    private final GenericWebSocketFrame.GenericFactory frameFactory;
    private final GenericWebSocketMessage.GenericFactory messageFactory;
    private final EventExecutor contextExecutor;
    private Optional<Sinks.Many<WebSocketFrame>> inboundFrames;
    private GenericInbound inbound;
    private GenericOutbound outbound;
    private Sinks.One<Publisher<WebSocketFrame>> outboundFramesSinks;
    private Publisher<WebSocketFrame> outboundFrames;
    private boolean started;
    private boolean outboundFramesSet;
    private boolean inboundSubscribed;
    private Mono<Void> finalizer;
    private boolean closed;

    public GenericWebSocketExchange(ChannelHandlerContext context, Exchange<ExchangeContext> exchange, String subProtocol, WebSocketExchangeHandler<ExchangeContext, WebSocketExchange<ExchangeContext>> handler, GenericWebSocketFrame.GenericFactory frameFactory, GenericWebSocketMessage.GenericFactory messageFactory) {
        this.context = context;
        this.exchange = exchange;
        this.subProtocol = subProtocol;
        this.handler = handler;
        this.frameFactory = frameFactory;
        this.messageFactory = messageFactory;
        this.contextExecutor = this.context.executor();
        this.inboundFrames = Optional.empty();
    }

    public void start() {
        Mono<Void> deferHandle;
        if (this.started) {
            throw new IllegalStateException("WebSocket Exchange already started");
        }
        if (this.outboundFrames == null) {
            this.outboundFramesSinks = Sinks.one();
            this.outboundFrames = Flux.switchOnNext((Publisher)this.outboundFramesSinks.asMono());
        }
        try {
            deferHandle = this.handler.defer(this);
        }
        catch (Throwable throwable) {
            this.hookOnError(throwable);
            return;
        }
        deferHandle.thenMany(this.outboundFrames).doOnDiscard(GenericWebSocketFrame.class, frame -> frame.release()).subscribe((CoreSubscriber)this);
        this.started = true;
    }

    protected void executeInEventLoop(Runnable runnable) {
        this.executeInEventLoop(runnable, 1);
    }

    protected void executeInEventLoop(Runnable runnable, int request) {
        if (this.contextExecutor.inEventLoop()) {
            runnable.run();
            this.request(request);
        } else {
            this.contextExecutor.execute(() -> {
                try {
                    runnable.run();
                    this.request(request);
                }
                catch (Throwable throwable) {
                    this.cancel();
                    this.hookOnError(throwable);
                }
            });
        }
    }

    protected final void hookOnSubscribe(Subscription subscription) {
        this.onStart(subscription);
    }

    protected void onStart(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        LOGGER.debug("WebSocket exchange started");
    }

    protected void hookOnNext(WebSocketFrame value) {
        if (value.getKind() == WebSocketFrame.Kind.CLOSE) {
            throw new WebSocketException("Invalid outbound frame type " + value.getKind() + ", use close() to close the WebSocket");
        }
        this.executeInEventLoop(() -> {
            LOGGER.trace("Write {} frame (size={}, final={})", (Object)value.getKind(), (Object)value.getBinaryData().readableBytes(), (Object)value.isFinal());
            this.context.writeAndFlush((Object)this.frameFactory.toUnderlyingWebSocketFrame(value));
        });
    }

    protected void hookOnCancel() {
        this.close(WebSocketStatus.ENDPOINT_UNAVAILABLE);
    }

    protected void hookOnComplete() {
        this.close(WebSocketStatus.NORMAL_CLOSURE);
    }

    protected void hookOnError(Throwable throwable) {
        LOGGER.error("WebSocketExchange processing error", throwable);
        this.close(WebSocketStatus.INTERNAL_SERVER_ERROR, throwable.getMessage());
    }

    protected void hookFinally(SignalType type) {
        super.hookFinally(type);
    }

    public void dispose() {
        super.dispose();
        this.inboundFrames.ifPresent(frameSink -> {
            frameSink.tryEmitComplete();
            if (!this.inboundSubscribed) {
                this.inboundSubscribed = true;
                frameSink.asFlux().subscribe(frame -> ((GenericWebSocketFrame)frame).release(), ex -> {});
            }
        });
        this.inboundFrames = Optional.empty();
    }

    public Optional<Sinks.Many<WebSocketFrame>> inboundFrames() {
        return this.inboundFrames;
    }

    protected void setOutboundFrames(Publisher<WebSocketFrame> frames) {
        if (this.started && this.outboundFramesSet) {
            throw new IllegalStateException("Outbound frames already set");
        }
        if (this.outboundFramesSinks != null) {
            this.outboundFramesSinks.tryEmitValue(frames);
        } else {
            this.outboundFrames = frames;
        }
        this.outboundFramesSet = true;
    }

    protected ChannelFuture finalizeExchange(ChannelPromise finalPromise) {
        finalPromise.addListener(future -> {
            this.inboundFrames.ifPresent(Sinks.Many::tryEmitComplete);
            if (this.finalizer != null) {
                this.finalizer.subscribe();
            }
        });
        return finalPromise;
    }

    @Override
    public Request request() {
        return this.exchange.request();
    }

    @Override
    public ExchangeContext context() {
        return this.exchange.context();
    }

    @Override
    public String getSubProtocol() {
        return this.subProtocol;
    }

    @Override
    public WebSocketExchange.Inbound inbound() {
        if (this.inbound == null) {
            Sinks.Many inboundFrameSink = Sinks.many().unicast().onBackpressureBuffer();
            this.inbound = new GenericInbound((Flux<WebSocketFrame>)inboundFrameSink.asFlux().doOnSubscribe(ign -> {
                this.inboundSubscribed = true;
            }).doOnDiscard(GenericWebSocketFrame.class, frame -> frame.release()));
            this.inboundFrames = Optional.of(inboundFrameSink);
        }
        return this.inbound;
    }

    @Override
    public WebSocketExchange.Outbound outbound() {
        if (this.outbound == null) {
            this.outbound = new GenericOutbound();
        }
        return this.outbound;
    }

    @Override
    public void close(short code, String reason) {
        boolean mustClose = !this.closed;
        this.closed = true;
        this.dispose();
        if (mustClose) {
            this.executeInEventLoop(() -> {
                ChannelPromise closePromise = this.context.newPromise();
                String cleanReason = reason;
                if (cleanReason != null && cleanReason.length() >= 123) {
                    cleanReason = cleanReason.substring(0, 120) + "...";
                }
                this.context.writeAndFlush((Object)new CloseWebSocketFrame((int)code, cleanReason), closePromise);
                closePromise.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                closePromise.addListener(ign -> LOGGER.debug("WebSocket closed ({}): {}", (Object)code, (Object)reason));
                this.finalizeExchange(closePromise);
            });
        }
    }

    void setClosed(short code, String reason) {
        if (!this.closed) {
            LOGGER.debug("WebSocket closed ({}): {}", (Object)code, (Object)reason);
        }
        this.closed = true;
    }

    @Override
    public WebSocketExchange<ExchangeContext> finalizer(Mono<Void> finalizer) {
        this.finalizer = finalizer;
        return this;
    }

    protected class GenericInbound
    implements WebSocketExchange.Inbound {
        private final Flux<WebSocketFrame> frames;
        private WebSocketFrame.Kind currentFrameKind;

        public GenericInbound(Flux<WebSocketFrame> frames) {
            this.frames = frames;
        }

        @Override
        public Publisher<WebSocketFrame> frames() {
            return this.frames;
        }

        @Override
        public Publisher<WebSocketMessage> messages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.TEXT || kind == WebSocketFrame.Kind.BINARY || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null) {
                    this.currentFrameKind = frame.getKind();
                    if (this.currentFrameKind == WebSocketFrame.Kind.CONTINUATION) {
                        GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                    }
                }
                return frame.isFinal();
            }).map(messageFrames -> {
                if (null == this.currentFrameKind) {
                    throw new IllegalStateException();
                }
                GenericWebSocketMessage message = switch (this.currentFrameKind) {
                    case WebSocketFrame.Kind.TEXT -> new GenericWebSocketMessage(WebSocketMessage.Kind.TEXT, (Publisher<WebSocketFrame>)messageFrames);
                    case WebSocketFrame.Kind.BINARY -> new GenericWebSocketMessage(WebSocketMessage.Kind.BINARY, (Publisher<WebSocketFrame>)messageFrames);
                    default -> throw new IllegalStateException();
                };
                this.currentFrameKind = null;
                return message;
            });
        }

        @Override
        public Publisher<WebSocketMessage> textMessages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.TEXT || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null) {
                    this.currentFrameKind = frame.getKind();
                    if (this.currentFrameKind == WebSocketFrame.Kind.CONTINUATION) {
                        GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                    }
                }
                return frame.isFinal();
            }).map(messageFrames -> {
                if (null == this.currentFrameKind) {
                    throw new IllegalStateException();
                }
                this.currentFrameKind = null;
                return new GenericWebSocketMessage(WebSocketMessage.Kind.TEXT, (Publisher<WebSocketFrame>)messageFrames);
            });
        }

        @Override
        public Publisher<WebSocketMessage> binaryMessages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.BINARY || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null) {
                    this.currentFrameKind = frame.getKind();
                    if (this.currentFrameKind == WebSocketFrame.Kind.CONTINUATION) {
                        GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                    }
                }
                return frame.isFinal();
            }).map(messageFrames -> {
                if (null == this.currentFrameKind) {
                    throw new IllegalStateException();
                }
                this.currentFrameKind = null;
                return new GenericWebSocketMessage(WebSocketMessage.Kind.BINARY, (Publisher<WebSocketFrame>)messageFrames);
            });
        }
    }

    protected class GenericOutbound
    implements WebSocketExchange.Outbound {
        protected GenericOutbound() {
        }

        @Override
        public void frames(Function<WebSocketFrame.Factory, Publisher<WebSocketFrame>> frames) {
            GenericWebSocketExchange.this.setOutboundFrames(frames.apply(GenericWebSocketExchange.this.frameFactory));
        }

        @Override
        public void messages(Function<WebSocketMessage.Factory, Publisher<WebSocketMessage>> messages) {
            GenericWebSocketExchange.this.setOutboundFrames((Publisher<WebSocketFrame>)Flux.from(messages.apply(GenericWebSocketExchange.this.messageFactory)).flatMap(WebSocketMessage::frames));
        }
    }
}

