package io.micronaut.configuration.graphql.ws;

import graphql.ExecutionResult;
import io.micronaut.configuration.graphql.GraphQLInvocation;
import io.micronaut.configuration.graphql.GraphQLInvocationData;
import io.micronaut.context.annotation.Requires;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.codec.CodecException;
import io.micronaut.scheduling.ScheduledExecutorTaskScheduler;
import io.micronaut.websocket.CloseReason;
import io.micronaut.websocket.WebSocketSession;
import io.micronaut.websocket.annotation.OnClose;
import io.micronaut.websocket.annotation.OnError;
import io.micronaut.websocket.annotation.OnMessage;
import io.micronaut.websocket.annotation.OnOpen;
import io.micronaut.websocket.annotation.ServerWebSocket;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Requires(property = GraphQLWsConfiguration.ENABLED_CONFIG, value = "true", defaultValue = "false")
@ServerWebSocket(value = "${graphql.graphql-ws.path:/graphql-ws}", subprotocols = "graphql-transport-ws")
/* loaded from: input_file:io/micronaut/configuration/graphql/ws/GraphQLWsHandler.class */
public class GraphQLWsHandler {
    static final String HTTP_REQUEST_KEY = "httpRequest";
    private static final Logger LOG = LoggerFactory.getLogger(GraphQLWsHandler.class);
    private final ScheduledExecutorTaskScheduler scheduler;
    private final GraphQLInvocation graphQLInvocation;
    private final GraphQLWsConfiguration configuration;
    private final ConcurrentSkipListSet<String> connections = new ConcurrentSkipListSet<>();
    private final ConcurrentMap<String, Publisher<? extends Message>> subscriptions = new ConcurrentHashMap();

    public GraphQLWsHandler(ScheduledExecutorTaskScheduler scheduledExecutorTaskScheduler, GraphQLInvocation graphQLInvocation, GraphQLWsConfiguration graphQLWsConfiguration) {
        this.scheduler = scheduledExecutorTaskScheduler;
        this.graphQLInvocation = graphQLInvocation;
        this.configuration = graphQLWsConfiguration;
    }

    @OnOpen
    public void onOpen(WebSocketSession webSocketSession, HttpRequest httpRequest) {
        webSocketSession.put(HTTP_REQUEST_KEY, httpRequest);
        this.scheduler.schedule(this.configuration.getConnectionInitWaitTimeout(), () -> {
            if (this.connections.contains(webSocketSession.getId())) {
                return;
            }
            webSocketSession.close(new CloseReason(4408, "Connection initialisation timeout."));
        });
        if (LOG.isTraceEnabled()) {
            LOG.trace("Opened websocket connection with id {}", webSocketSession.getId());
        }
    }

    @OnMessage
    public Publisher<Message> onMessage(Message message, WebSocketSession webSocketSession) {
        if (message instanceof ConnectionInitMessage) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received connection initialisation request for session id {}", webSocketSession.getId());
            }
            return this.connections.add(webSocketSession.getId()) ? webSocketSession.send(new ConnectionAckMessage()) : tooManyInitialisationRequests(webSocketSession);
        }
        if (message instanceof PingMessage) {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Received a ping message for session id {}", webSocketSession.getId());
            }
            return webSocketSession.send(new PongMessage());
        }
        if (!(message instanceof SubscribeMessage)) {
            if (message instanceof CompleteMessage) {
                CompleteMessage completeMessage = (CompleteMessage) message;
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Received complete message for session id {}", webSocketSession.getId());
                }
                this.subscriptions.remove(completeMessage.getId());
            }
            return Mono.empty();
        }
        SubscribeMessage subscribeMessage = (SubscribeMessage) message;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received subscription message for session id {}", webSocketSession.getId());
        }
        if (!this.connections.contains(webSocketSession.getId())) {
            return unauthorized(webSocketSession);
        }
        if (this.subscriptions.containsKey(subscribeMessage.getId())) {
            return subscriberAlreadyExists(subscribeMessage.getId(), webSocketSession);
        }
        Mono doFinally = executeSubscribe(subscribeMessage, webSocketSession).doFinally(signalType -> {
            this.subscriptions.remove(subscribeMessage.getId());
        });
        this.subscriptions.put(subscribeMessage.getId(), doFinally);
        return doFinally;
    }

    private Mono<Message> executeSubscribe(SubscribeMessage subscribeMessage, WebSocketSession webSocketSession) {
        GraphQLInvocationData graphQLInvocationData = new GraphQLInvocationData(subscribeMessage.getSubscribePayload().getQuery(), subscribeMessage.getSubscribePayload().getOperationName(), subscribeMessage.getSubscribePayload().getVariables());
        Optional optional = webSocketSession.get(HTTP_REQUEST_KEY, HttpRequest.class);
        if (optional.isEmpty()) {
            return Mono.error(new IllegalStateException("The HTTP request from the original WebSocket connection could not be retrieved."));
        }
        Mono last = Flux.from(this.graphQLInvocation.invoke(graphQLInvocationData, (HttpRequest) optional.get(), null)).flatMap(executionResult -> {
            if (executionResult.isDataPresent() && executionResult.getData() != null) {
                Object data = executionResult.getData();
                if (data instanceof Publisher) {
                    return handleExecutionResultPublisher((Publisher) data);
                }
            }
            return Flux.just(executionResult);
        }).takeUntil(executionResult2 -> {
            return !this.subscriptions.containsKey(subscribeMessage.getId());
        }).flatMap(executionResult3 -> {
            return handleExecutionResult(subscribeMessage, webSocketSession, executionResult3);
        }).last();
        Class<NextMessage> cls = NextMessage.class;
        Objects.requireNonNull(NextMessage.class);
        return last.filter((v1) -> {
            return r1.isInstance(v1);
        }).flatMap(message -> {
            return completeSubscription(subscribeMessage, webSocketSession);
        });
    }

    private Flux<ExecutionResult> handleExecutionResultPublisher(Publisher<?> publisher) {
        return Flux.from(publisher).map(obj -> {
            if (obj instanceof ExecutionResult) {
                return (ExecutionResult) obj;
            }
            throw new IllegalArgumentException("Subscription data is an invalid type " + obj.getClass().getName() + "- expected to be an ExecutionResult");
        });
    }

    private Publisher<Message> handleExecutionResult(SubscribeMessage subscribeMessage, WebSocketSession webSocketSession, ExecutionResult executionResult) {
        return (webSocketSession.isOpen() || !this.subscriptions.containsKey(subscribeMessage.getId())) ? executionResult.getErrors().isEmpty() ? webSocketSession.send(new NextMessage(subscribeMessage.getId(), executionResult)) : webSocketSession.send(ErrorMessage.of(subscribeMessage.getId(), executionResult.getErrors())) : Mono.empty();
    }

    private Mono<CompleteMessage> completeSubscription(SubscribeMessage subscribeMessage, WebSocketSession webSocketSession) {
        return Mono.from((webSocketSession.isOpen() && this.subscriptions.containsKey(subscribeMessage.getId())) ? webSocketSession.send(new CompleteMessage(subscribeMessage.getId())) : Mono.empty());
    }

    private Publisher<Message> unauthorized(WebSocketSession webSocketSession) {
        webSocketSession.close(new CloseReason(4401, "Unauthorized."));
        return Mono.empty();
    }

    private Publisher<Message> tooManyInitialisationRequests(WebSocketSession webSocketSession) {
        webSocketSession.close(new CloseReason(4403, "Too many initialisation requests."));
        return Mono.empty();
    }

    private Publisher<Message> subscriberAlreadyExists(String str, WebSocketSession webSocketSession) {
        webSocketSession.close(new CloseReason(4409, "Subscriber for " + str + " already exists."));
        return Mono.empty();
    }

    @OnClose
    public void onClose(WebSocketSession webSocketSession, CloseReason closeReason) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Closed websocket connection with id {} with reason {}", webSocketSession.getId(), closeReason);
        }
    }

    @OnError
    public void onError(WebSocketSession webSocketSession, Throwable th) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Error websocket connection with id {} with error {}", webSocketSession.getId(), th.getMessage());
        }
        if ((th instanceof CodecException) || (th instanceof InstantiationError)) {
            webSocketSession.close(new CloseReason(4400, "Invalid message."));
        }
    }
}
