/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal.http1x.ws;

import io.inverno.mod.http.base.ExchangeContext;
import io.inverno.mod.http.base.internal.ws.GenericWebSocketFrame;
import io.inverno.mod.http.base.internal.ws.GenericWebSocketMessage;
import io.inverno.mod.http.base.ws.BaseWebSocketExchange;
import io.inverno.mod.http.base.ws.WebSocketException;
import io.inverno.mod.http.base.ws.WebSocketFrame;
import io.inverno.mod.http.base.ws.WebSocketMessage;
import io.inverno.mod.http.base.ws.WebSocketStatus;
import io.inverno.mod.http.server.Exchange;
import io.inverno.mod.http.server.Request;
import io.inverno.mod.http.server.ws.WebSocketExchange;
import io.inverno.mod.http.server.ws.WebSocketExchangeHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.ScheduledFuture;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class GenericWebSocketExchange
extends BaseSubscriber<WebSocketFrame>
implements WebSocketExchange<ExchangeContext> {
    private static final Logger LOGGER = LogManager.getLogger(WebSocketExchange.class);
    private final ChannelHandlerContext context;
    private final Exchange<ExchangeContext> exchange;
    private final String subProtocol;
    private final WebSocketExchangeHandler<ExchangeContext, WebSocketExchange<ExchangeContext>> handler;
    private final GenericWebSocketFrame.GenericFactory frameFactory;
    private final GenericWebSocketMessage.GenericFactory messageFactory;
    private final boolean closeOnOutboundComplete;
    private final long inboundCloseFrameTimeout;
    private final EventExecutor contextExecutor;
    private Optional<Sinks.Many<WebSocketFrame>> inboundFrames;
    private GenericInbound inbound;
    private GenericOutbound outbound;
    private Sinks.One<Publisher<WebSocketFrame>> outboundFramesSinks;
    private Publisher<WebSocketFrame> outboundFrames;
    private boolean started;
    private boolean outboundFramesSet;
    private boolean inboundSubscribed;
    private Mono<Void> finalizer;
    private boolean inClosed;
    private boolean outClosed;
    private ScheduledFuture<?> inboundCloseMessageTimeoutFuture;

    public GenericWebSocketExchange(ChannelHandlerContext context, Exchange<ExchangeContext> exchange, String subProtocol, WebSocketExchangeHandler<ExchangeContext, WebSocketExchange<ExchangeContext>> handler, GenericWebSocketFrame.GenericFactory frameFactory, GenericWebSocketMessage.GenericFactory messageFactory, boolean closeOnOutboundComplete, long inboundCloseFrameTimeout) {
        this.context = context;
        this.exchange = exchange;
        this.subProtocol = subProtocol;
        this.handler = handler;
        this.frameFactory = frameFactory;
        this.messageFactory = messageFactory;
        this.closeOnOutboundComplete = closeOnOutboundComplete;
        this.inboundCloseFrameTimeout = inboundCloseFrameTimeout;
        this.contextExecutor = this.context.executor();
        this.inboundFrames = Optional.empty();
    }

    public void start() {
        Mono<Void> deferHandle;
        if (this.started) {
            throw new IllegalStateException("WebSocket Exchange already started");
        }
        if (this.outboundFrames == null) {
            this.outboundFramesSinks = Sinks.one();
            this.outboundFrames = Flux.switchOnNext((Publisher)this.outboundFramesSinks.asMono());
        }
        try {
            deferHandle = this.handler.defer(this);
        }
        catch (Throwable throwable) {
            this.hookOnError(throwable);
            return;
        }
        deferHandle.thenMany(this.outboundFrames).doOnDiscard(GenericWebSocketFrame.class, frame -> frame.release()).subscribe((CoreSubscriber)this);
        this.started = true;
    }

    protected void executeInEventLoop(Runnable runnable) {
        this.executeInEventLoop(runnable, 1);
    }

    protected void executeInEventLoop(Runnable runnable, int request) {
        if (this.contextExecutor.inEventLoop()) {
            runnable.run();
            this.request(request);
        } else {
            this.contextExecutor.execute(() -> {
                try {
                    runnable.run();
                    this.request(request);
                }
                catch (Throwable throwable) {
                    this.cancel();
                    this.hookOnError(throwable);
                }
            });
        }
    }

    protected final void hookOnSubscribe(Subscription subscription) {
        this.onStart(subscription);
    }

    protected void onStart(Subscription subscription) {
        subscription.request(Long.MAX_VALUE);
        LOGGER.debug("WebSocket exchange started");
    }

    protected void hookOnNext(WebSocketFrame value) {
        if (value.getKind() == WebSocketFrame.Kind.CLOSE) {
            throw new WebSocketException("Invalid outbound frame type " + String.valueOf(value.getKind()) + ", use close() to close the WebSocket");
        }
        this.executeInEventLoop(() -> {
            LOGGER.trace("Write {} frame (size={}, final={})", (Object)value.getKind(), (Object)value.getBinaryData().readableBytes(), (Object)value.isFinal());
            this.context.writeAndFlush((Object)this.frameFactory.toUnderlyingWebSocketFrame(value));
        });
    }

    protected void hookOnCancel() {
        this.close(WebSocketStatus.ENDPOINT_UNAVAILABLE);
    }

    protected void hookOnComplete() {
        if (this.outbound != null && this.outbound.closeOnComplete) {
            this.close();
        }
    }

    protected void hookOnError(Throwable throwable) {
        LOGGER.error("WebSocketExchange processing error", throwable);
        this.close(WebSocketStatus.INTERNAL_SERVER_ERROR, throwable.getMessage());
    }

    public void dispose() {
        this.dispose(null);
    }

    public void dispose(Throwable error) {
        super.dispose();
        this.inboundFrames.ifPresent(frameSink -> {
            if (error != null) {
                frameSink.tryEmitError(error);
            } else {
                frameSink.tryEmitComplete();
            }
            if (!this.inboundSubscribed) {
                this.inboundSubscribed = true;
                frameSink.asFlux().subscribe(frame -> ((GenericWebSocketFrame)frame).release(), ex -> {});
            }
        });
        this.inboundFrames = Optional.empty();
    }

    public Optional<Sinks.Many<WebSocketFrame>> inboundFrames() {
        return this.inboundFrames;
    }

    protected void setOutboundFrames(Publisher<WebSocketFrame> frames) {
        if (this.started && this.outboundFramesSet) {
            throw new IllegalStateException("Outbound frames already set");
        }
        if (this.outboundFramesSinks != null) {
            this.outboundFramesSinks.tryEmitValue(frames);
        } else {
            this.outboundFrames = frames;
        }
        this.outboundFramesSet = true;
    }

    public ChannelFuture finalizeExchange(ChannelPromise finalPromise) {
        finalPromise.addListener(future -> {
            this.inboundFrames.ifPresent(Sinks.Many::tryEmitComplete);
            if (this.finalizer != null) {
                this.finalizer.subscribe();
            }
        });
        return finalPromise;
    }

    @Override
    public Request request() {
        return this.exchange.request();
    }

    public ExchangeContext context() {
        return this.exchange.context();
    }

    public String getSubProtocol() {
        return this.subProtocol;
    }

    public BaseWebSocketExchange.Inbound inbound() {
        if (this.inbound == null) {
            Sinks.Many inboundFrameSink = Sinks.many().unicast().onBackpressureBuffer();
            this.inbound = new GenericInbound((Flux<WebSocketFrame>)inboundFrameSink.asFlux().doOnSubscribe(ign -> {
                this.inboundSubscribed = true;
            }).doOnDiscard(GenericWebSocketFrame.class, frame -> frame.release()).doOnTerminate(() -> {
                this.inbound = null;
                this.inboundFrames = Optional.empty();
            }));
            this.inboundFrames = Optional.of(inboundFrameSink);
        }
        return this.inbound;
    }

    public BaseWebSocketExchange.Outbound outbound() {
        if (this.outbound == null) {
            this.outbound = new GenericOutbound();
        }
        return this.outbound;
    }

    public void close(short code, String reason) {
        if (!this.outClosed) {
            this.executeInEventLoop(() -> {
                if (!this.outClosed) {
                    this.outClosed = true;
                    String cleanReason = reason;
                    if (cleanReason != null && cleanReason.length() >= 123) {
                        cleanReason = cleanReason.substring(0, 120) + "...";
                    }
                    this.context.writeAndFlush((Object)new CloseWebSocketFrame((int)code, cleanReason));
                    LOGGER.debug("WebSocket close frame sent ({}): {}", (Object)code, (Object)reason);
                    if (!this.inClosed) {
                        this.inboundCloseMessageTimeoutFuture = this.contextExecutor.schedule(() -> {
                            this.dispose((Throwable)new WebSocketException("Inbound close frame timeout"));
                            ChannelPromise closePromise = this.context.newPromise();
                            this.context.close(closePromise);
                            closePromise.addListener(ign -> LOGGER.debug("WebSocket closed ({}): {}", (Object)code, (Object)reason));
                            this.finalizeExchange(closePromise);
                        }, this.inboundCloseFrameTimeout, TimeUnit.MILLISECONDS);
                    }
                }
            });
        }
    }

    public void onCloseReceived(short code, String reason) {
        if (!this.inClosed) {
            LOGGER.debug("WebSocket close frame received ({}): {}", (Object)code, (Object)reason);
        }
        this.inClosed = true;
        if (this.inboundCloseMessageTimeoutFuture != null) {
            this.inboundCloseMessageTimeoutFuture.cancel(false);
            this.inboundCloseMessageTimeoutFuture = null;
        }
        this.dispose();
        this.close(code, reason);
        ChannelPromise closePromise = this.context.newPromise();
        this.context.close(closePromise);
        closePromise.addListener(ign -> LOGGER.debug("WebSocket closed ({}): {}", (Object)code, (Object)reason));
        this.finalizeExchange(closePromise);
    }

    public WebSocketExchange<ExchangeContext> finalizer(Mono<Void> finalizer) {
        this.finalizer = finalizer;
        return this;
    }

    protected class GenericOutbound
    implements BaseWebSocketExchange.Outbound {
        protected boolean closeOnComplete;

        protected GenericOutbound() {
            this.closeOnComplete = GenericWebSocketExchange.this.closeOnOutboundComplete;
        }

        public BaseWebSocketExchange.Outbound closeOnComplete(boolean closeOnComplete) {
            this.closeOnComplete = closeOnComplete;
            return this;
        }

        public void frames(Function<WebSocketFrame.Factory, Publisher<WebSocketFrame>> frames) {
            GenericWebSocketExchange.this.setOutboundFrames(frames.apply((WebSocketFrame.Factory)GenericWebSocketExchange.this.frameFactory));
        }

        public void messages(Function<WebSocketMessage.Factory, Publisher<WebSocketMessage>> messages) {
            GenericWebSocketExchange.this.setOutboundFrames((Publisher<WebSocketFrame>)Flux.from(messages.apply((WebSocketMessage.Factory)GenericWebSocketExchange.this.messageFactory)).flatMap(WebSocketMessage::frames));
        }
    }

    protected class GenericInbound
    implements BaseWebSocketExchange.Inbound {
        private final Flux<WebSocketFrame> frames;
        private WebSocketFrame.Kind currentFrameKind;

        public GenericInbound(Flux<WebSocketFrame> frames) {
            this.frames = frames;
        }

        public Publisher<WebSocketFrame> frames() {
            return this.frames;
        }

        public Publisher<WebSocketMessage> messages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.TEXT || kind == WebSocketFrame.Kind.BINARY || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null && frame.getKind() == WebSocketFrame.Kind.CONTINUATION) {
                    GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                }
                this.currentFrameKind = frame.isFinal() ? null : frame.getKind();
                return frame.isFinal();
            }).map(messageFrames -> {
                if (null == this.currentFrameKind) {
                    throw new IllegalStateException();
                }
                switch (this.currentFrameKind) {
                    case TEXT: {
                        return new GenericWebSocketMessage(WebSocketMessage.Kind.TEXT, (Publisher)messageFrames);
                    }
                    case BINARY: {
                        return new GenericWebSocketMessage(WebSocketMessage.Kind.BINARY, (Publisher)messageFrames);
                    }
                }
                throw new IllegalStateException();
            });
        }

        public Publisher<WebSocketMessage> textMessages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.TEXT || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null && frame.getKind() == WebSocketFrame.Kind.CONTINUATION) {
                    GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                }
                this.currentFrameKind = frame.isFinal() ? null : frame.getKind();
                return frame.isFinal();
            }).map(messageFrames -> new GenericWebSocketMessage(WebSocketMessage.Kind.TEXT, (Publisher)messageFrames));
        }

        public Publisher<WebSocketMessage> binaryMessages() {
            return this.frames.filter(frame -> {
                WebSocketFrame.Kind kind = frame.getKind();
                return kind == WebSocketFrame.Kind.BINARY || kind == WebSocketFrame.Kind.CONTINUATION;
            }).windowUntil(frame -> {
                if (this.currentFrameKind == null && frame.getKind() == WebSocketFrame.Kind.CONTINUATION) {
                    GenericWebSocketExchange.this.close(WebSocketStatus.PROTOCOL_ERROR);
                }
                this.currentFrameKind = frame.isFinal() ? null : frame.getKind();
                return frame.isFinal();
            }).map(messageFrames -> new GenericWebSocketMessage(WebSocketMessage.Kind.BINARY, (Publisher)messageFrames));
        }
    }
}

