/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.webserver.websocket;

import io.helidon.common.http.Parameters;
import io.helidon.common.http.UriComponent;
import io.helidon.common.reactive.BufferedEmittingPublisher;
import io.helidon.common.reactive.Multi;
import io.helidon.webserver.websocket.WebSocketRoute;
import io.helidon.webserver.websocket.WebSocketRouting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.concurrent.GenericFutureListener;
import jakarta.websocket.CloseReason;
import jakarta.websocket.DeploymentException;
import jakarta.websocket.Extension;
import jakarta.websocket.WebSocketContainer;
import jakarta.websocket.server.ServerEndpointConfig;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.glassfish.tyrus.core.RequestContext;
import org.glassfish.tyrus.core.TyrusUpgradeResponse;
import org.glassfish.tyrus.core.TyrusWebSocketEngine;
import org.glassfish.tyrus.server.TyrusServerContainer;
import org.glassfish.tyrus.spi.CompletionHandler;
import org.glassfish.tyrus.spi.Connection;
import org.glassfish.tyrus.spi.UpgradeRequest;
import org.glassfish.tyrus.spi.UpgradeResponse;
import org.glassfish.tyrus.spi.WebSocketEngine;
import org.glassfish.tyrus.spi.Writer;

class WebSocketHandler
extends SimpleChannelInboundHandler<Object> {
    private static final Logger LOGGER = Logger.getLogger(WebSocketHandler.class.getName());
    private static final int MAX_RETRIES = 5;
    private final WebSocketEngine engine;
    private final String path;
    private final String queryString;
    private final FullHttpRequest upgradeRequest;
    private final HttpHeaders upgradeResponseHeaders;
    private final WebSocketRouting webSocketRouting;
    private final TyrusServerContainer tyrusServerContainer;
    private volatile Connection connection;
    private final WebSocketEngine.UpgradeInfo upgradeInfo;
    private final BufferedEmittingPublisher<ByteBuf> emitter;
    private final TyrusUpgradeResponse upgradeResponse = new TyrusUpgradeResponse();

    WebSocketHandler(ChannelHandlerContext ctx, String path, FullHttpRequest upgradeRequest, HttpHeaders upgradeResponseHeaders, final WebSocketRouting webSocketRouting) {
        int k = path.indexOf(63);
        if (k > 0) {
            this.path = path.substring(0, k);
            this.queryString = path.substring(k + 1);
        } else {
            this.path = path;
            this.queryString = "";
        }
        this.upgradeRequest = upgradeRequest;
        this.upgradeResponseHeaders = upgradeResponseHeaders;
        this.webSocketRouting = webSocketRouting;
        this.emitter = BufferedEmittingPublisher.create();
        Set allEndpointClasses = webSocketRouting.getRoutes().stream().map(WebSocketRoute::endpointClass).collect(Collectors.toSet());
        this.tyrusServerContainer = new TyrusServerContainer(allEndpointClasses){
            private final WebSocketEngine engine;
            {
                super(arg0);
                this.engine = TyrusWebSocketEngine.builder((WebSocketContainer)this).build();
            }

            public void register(Class<?> endpointClass) {
                throw new UnsupportedOperationException("Use TyrusWebSocketEngine for registration");
            }

            public void register(ServerEndpointConfig serverEndpointConfig) {
                throw new UnsupportedOperationException("Use TyrusWebSocketEngine for registration");
            }

            public Set<Extension> getInstalledExtensions() {
                return webSocketRouting.getExtensions();
            }

            public WebSocketEngine getWebSocketEngine() {
                return this.engine;
            }
        };
        WebSocketEngine engine = this.tyrusServerContainer.getWebSocketEngine();
        webSocketRouting.getRoutes().forEach(wsRoute -> {
            try {
                if (wsRoute.serverEndpointConfig() != null) {
                    LOGGER.log(Level.FINE, () -> "Registering ws endpoint " + wsRoute.path() + wsRoute.serverEndpointConfig().getPath());
                    engine.register(wsRoute.serverEndpointConfig(), wsRoute.path());
                } else {
                    LOGGER.log(Level.FINE, () -> "Registering annotated ws endpoint " + wsRoute.path());
                    engine.register(wsRoute.endpointClass(), wsRoute.path());
                }
            }
            catch (DeploymentException e) {
                throw new RuntimeException(e);
            }
        });
        this.engine = this.tyrusServerContainer.getWebSocketEngine();
        this.upgradeInfo = this.upgrade(ctx);
    }

    TyrusUpgradeResponse upgradeResponse() {
        return this.upgradeResponse;
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        LOGGER.log(Level.SEVERE, "WS handler ERROR ", cause);
    }

    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        if (this.connection != null) {
            this.connection.close(new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.CLOSED_ABNORMALLY, "Client connection closed"));
        }
        this.tyrusServerContainer.shutdown();
        super.channelUnregistered(ctx);
    }

    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            ByteBuf byteBuf = (ByteBuf)msg;
            this.emitter.emit((Object)byteBuf.copy());
        }
    }

    private void sendBytesToTyrus(ChannelHandlerContext ctx, ByteBuf byteBuf) {
        ByteBuffer nioBuffer = byteBuf.nioBuffer();
        int retries = 5;
        while (nioBuffer.remaining() > 0 && retries-- > 0) {
            this.connection.getReadHandler().handle(nioBuffer);
        }
        byteBuf.release();
        if (retries == 0) {
            ctx.close();
            this.connection.close(new CloseReason((CloseReason.CloseCode)CloseReason.CloseCodes.UNEXPECTED_CONDITION, "Tyrus did not consume all data after 5 retries"));
        }
    }

    WebSocketEngine.UpgradeInfo upgrade(ChannelHandlerContext ctx) {
        LOGGER.fine("Initiating WebSocket handshake ...");
        HashMap paramsMap = new HashMap();
        Parameters params = UriComponent.decodeQuery((String)this.queryString, (boolean)true);
        params.toMap().forEach((key, value) -> paramsMap.put(key, value.toArray(new String[0])));
        RequestContext requestContext = RequestContext.Builder.create().requestURI(URI.create(this.path)).queryString(this.queryString).parameterMap(paramsMap).build();
        this.upgradeRequest.headers().forEach(e -> requestContext.getHeaders().put((String)e.getKey(), List.of((String)e.getValue())));
        WebSocketEngine.UpgradeInfo upgradeInfo = this.engine.upgrade((UpgradeRequest)requestContext, (UpgradeResponse)this.upgradeResponse);
        this.upgradeResponse.getHeaders().forEach((arg_0, arg_1) -> ((HttpHeaders)this.upgradeResponseHeaders).add(arg_0, arg_1));
        return upgradeInfo;
    }

    void open(final ChannelHandlerContext ctx) {
        Writer writer = new Writer(){

            public void close() throws IOException {
                ctx.write((Object)Unpooled.EMPTY_BUFFER).addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
            }

            public void write(ByteBuffer byteBuffer, CompletionHandler<ByteBuffer> completionHandler) {
                ctx.writeAndFlush((Object)Unpooled.wrappedBuffer((ByteBuffer)byteBuffer)).addListener(f -> {
                    if (f.isSuccess()) {
                        completionHandler.completed((Object)byteBuffer);
                    } else {
                        completionHandler.failed(f.cause());
                    }
                });
            }
        };
        if (this.webSocketRouting.getExecutorService() != null) {
            CompletableFuture.supplyAsync(() -> {
                this.connection = this.upgradeInfo.createConnection(writer, WebSocketHandler::close);
                return ctx;
            }, this.webSocketRouting.getExecutorService()).thenAccept(c -> Multi.create(this.emitter).observeOn((Executor)this.webSocketRouting.getExecutorService()).forEach(byteBuf -> this.sendBytesToTyrus((ChannelHandlerContext)c, (ByteBuf)byteBuf)).onError(this::logError));
        } else {
            this.connection = this.upgradeInfo.createConnection(writer, WebSocketHandler::close);
            Multi.create(this.emitter).forEach(byteBuf -> this.sendBytesToTyrus(ctx, (ByteBuf)byteBuf)).onError(this::logError);
        }
        ctx.channel().config().setAutoRead(true);
    }

    private void logError(Throwable throwable) {
        LOGGER.log(Level.SEVERE, "WS handler ERROR ", throwable);
    }

    private static void close(CloseReason closeReason) {
        LOGGER.fine(() -> "Connection closed: " + closeReason);
    }
}

