/*
 * Decompiled with CFR 0.152.
 */
package io.inverno.mod.http.server.internal.http1x;

import io.inverno.mod.http.base.ExchangeContext;
import io.inverno.mod.http.base.Status;
import io.inverno.mod.http.base.ws.WebSocketStatus;
import io.inverno.mod.http.server.HttpServerConfiguration;
import io.inverno.mod.http.server.internal.http1x.Http1xConnection;
import io.inverno.mod.http.server.internal.http1x.Http1xExchange;
import io.inverno.mod.http.server.internal.http1x.ws.GenericWebSocketExchange;
import io.inverno.mod.http.server.ws.WebSocket;
import io.inverno.mod.http.server.ws.WebSocketExchange;
import io.inverno.mod.http.server.ws.WebSocketExchangeHandler;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Mono;

class Http1xWebSocket
implements WebSocket<ExchangeContext, WebSocketExchange<ExchangeContext>> {
    private final Http1xConnection connection;
    private final Http1xExchange exchange;
    private final String[] subProtocols;
    private WebSocketExchangeHandler<? super ExchangeContext, WebSocketExchange<ExchangeContext>> handler;
    private Mono<Void> fallback;
    private Disposable disposable;
    private GenericWebSocketExchange webSocketExchange;

    public Http1xWebSocket(HttpServerConfiguration configuration, Http1xConnection connection, Http1xExchange exchange, String[] subProtocols) {
        this.connection = connection;
        this.exchange = exchange;
        this.subProtocols = subProtocols;
    }

    void connect() {
        if (this.connection.executor().inEventLoop()) {
            this.connection.writeWebSocketHandshake(this.subProtocols).subscribe((CoreSubscriber)new WebSocketHandshakeSubscriber());
        } else {
            this.connection.executor().execute(this::connect);
        }
    }

    void dispose(Throwable cause) {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
        if (this.webSocketExchange != null) {
            if (cause == null) {
                this.webSocketExchange.close();
            } else {
                this.webSocketExchange.close(WebSocketStatus.INTERNAL_SERVER_ERROR, cause.getMessage());
            }
        }
    }

    public Mono<Void> getFallback() {
        return this.fallback;
    }

    @Override
    public WebSocket<ExchangeContext, WebSocketExchange<ExchangeContext>> handler(WebSocketExchangeHandler<? super ExchangeContext, WebSocketExchange<ExchangeContext>> handler) {
        this.handler = handler;
        return this;
    }

    @Override
    public WebSocket<ExchangeContext, WebSocketExchange<ExchangeContext>> or(Mono<Void> fallback) {
        this.fallback = fallback;
        return this;
    }

    private class WebSocketHandshakeSubscriber
    extends BaseSubscriber<GenericWebSocketExchange> {
        private WebSocketHandshakeSubscriber() {
        }

        protected void hookOnSubscribe(Subscription subscription) {
            Http1xWebSocket.this.disposable = this;
            subscription.request(1L);
        }

        protected void hookOnNext(GenericWebSocketExchange value) {
            Http1xWebSocket.this.webSocketExchange = value;
        }

        protected void hookOnComplete() {
            Http1xWebSocket.this.exchange.response().headers(headers -> headers.status(Status.SWITCHING_PROTOCOLS));
            Http1xWebSocket.this.disposable = null;
            Http1xWebSocket.this.exchange.request().dispose(null);
            Http1xWebSocket.this.exchange.response().dispose(null);
            Http1xWebSocket.this.webSocketExchange.start(Http1xWebSocket.this.handler);
        }

        protected void hookOnError(Throwable throwable) {
            Http1xWebSocket.this.exchange.handleWebSocketHandshakeError(throwable, Http1xWebSocket.this.fallback);
        }
    }
}

