package io.micronaut.configuration.graphql.ws;

import graphql.ExecutionResult;
import io.micronaut.configuration.graphql.GraphQLJsonSerializer;
import io.micronaut.configuration.graphql.GraphQLResponseBody;
import io.micronaut.configuration.graphql.ws.GraphQLWsResponse;
import io.micronaut.core.async.subscriber.CompletionAwareSubscriber;
import io.micronaut.websocket.WebSocketSession;
import jakarta.inject.Singleton;
import java.util.Collection;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

@Singleton
/* loaded from: input_file:io/micronaut/configuration/graphql/ws/GraphQLWsSender.class */
public class GraphQLWsSender {
    private static final Logger LOG = LoggerFactory.getLogger(GraphQLWsSender.class);
    private final GraphQLWsState state;
    private final GraphQLJsonSerializer graphQLJsonSerializer;

    /* loaded from: input_file:io/micronaut/configuration/graphql/ws/GraphQLWsSender$SendSubscriber.class */
    private final class SendSubscriber extends CompletionAwareSubscriber<ExecutionResult> {
        private final String operationId;
        private final WebSocketSession session;

        private SendSubscriber(String str, WebSocketSession webSocketSession) {
            this.operationId = str;
            this.session = webSocketSession;
        }

        Subscription getSubscription() {
            return this.subscription;
        }

        protected void doOnSubscribe(Subscription subscription) {
            GraphQLWsSender.LOG.info("Subscribed to results for to operation {} in session {}", this.operationId, this.session.getId());
            subscription.request(1L);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void doOnNext(ExecutionResult executionResult) {
            convertAndSend(executionResult);
            this.subscription.request(1L);
        }

        protected void doOnError(Throwable th) {
            GraphQLWsSender.LOG.warn("Error in SendSubscriber", th);
            send(new GraphQLWsResponse(GraphQLWsResponse.ServerType.GQL_ERROR, this.operationId));
        }

        protected void doOnComplete() {
            GraphQLWsSender.LOG.info("Completed results for operation {} in session {}", this.operationId, this.session.getId());
            if (GraphQLWsSender.this.state.removeCompleted(this.operationId, this.session)) {
                send(new GraphQLWsResponse(GraphQLWsResponse.ServerType.GQL_COMPLETE, this.operationId));
            }
        }

        private void convertAndSend(ExecutionResult executionResult) {
            send(GraphQLWsSender.this.toGraphQLWsResponse(this.operationId, new GraphQLResponseBody(executionResult.toSpecification())));
        }

        private void send(GraphQLWsResponse graphQLWsResponse) {
            if (this.session.isOpen()) {
                this.session.sendSync(GraphQLWsSender.this.graphQLJsonSerializer.serialize(graphQLWsResponse));
            }
        }
    }

    public GraphQLWsSender(GraphQLWsState graphQLWsState, GraphQLJsonSerializer graphQLJsonSerializer) {
        this.state = graphQLWsState;
        this.graphQLJsonSerializer = graphQLJsonSerializer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<GraphQLWsResponse> send(String str, GraphQLResponseBody graphQLResponseBody, WebSocketSession webSocketSession) {
        Object obj = graphQLResponseBody.getSpecification().get("data");
        if (!(obj instanceof Publisher)) {
            return Flux.just(new GraphQLWsResponse[]{toGraphQLWsResponse(str, graphQLResponseBody), new GraphQLWsResponse(GraphQLWsResponse.ServerType.GQL_COMPLETE, str)});
        }
        startSubscription(str, (Publisher) obj, webSocketSession);
        return Flux.empty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public GraphQLWsResponse toGraphQLWsResponse(String str, GraphQLResponseBody graphQLResponseBody) {
        return hasErrors(graphQLResponseBody) ? new GraphQLWsResponse(GraphQLWsResponse.ServerType.GQL_ERROR, str, graphQLResponseBody) : new GraphQLWsResponse(GraphQLWsResponse.ServerType.GQL_DATA, str, graphQLResponseBody);
    }

    private boolean hasErrors(GraphQLResponseBody graphQLResponseBody) {
        Object obj = graphQLResponseBody.getSpecification().get("errors");
        return (obj instanceof Collection) && !((Collection) obj).isEmpty();
    }

    private Function<String, Subscription> starter(Publisher<ExecutionResult> publisher, WebSocketSession webSocketSession) {
        return str -> {
            SendSubscriber sendSubscriber = new SendSubscriber(str, webSocketSession);
            publisher.subscribe(sendSubscriber);
            return sendSubscriber.getSubscription();
        };
    }

    private void startSubscription(String str, Publisher<ExecutionResult> publisher, WebSocketSession webSocketSession) {
        this.state.saveOperation(str, webSocketSession, starter(publisher, webSocketSession));
    }
}
