package org.kinotic.continuum.gateway.internal.endpoints.rest;

import io.vertx.core.AbstractVerticle;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServer;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BasicAuthHandler;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.StaticHandler;
import java.util.HashMap;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import org.kinotic.continuum.core.api.event.Event;
import org.kinotic.continuum.core.api.event.EventBusService;
import org.kinotic.continuum.gateway.api.config.ContinuumGatewayProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Component
/* loaded from: input_file:org/kinotic/continuum/gateway/internal/endpoints/rest/RestServerVerticle.class */
public class RestServerVerticle extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(RestServerVerticle.class);
    private final HashMap<String, RoutingContext> responseCorrelationMap = new HashMap<>();
    private final ContinuumGatewayProperties gatewayProperties;
    private final EventBusService eventService;
    private final ContinuumGatewayRestAuthProvider authProvider;
    private Scheduler scheduler;
    private Disposable disposable;
    private HttpServer httpServer;

    public RestServerVerticle(ContinuumGatewayProperties continuumGatewayProperties, EventBusService eventBusService, ContinuumGatewayRestAuthProvider continuumGatewayRestAuthProvider) {
        this.gatewayProperties = continuumGatewayProperties;
        this.eventService = eventBusService;
        this.authProvider = continuumGatewayRestAuthProvider;
    }

    public void start(Future<Void> future) {
        String str = "srv://" + UUID.randomUUID() + "@continuum.java.rest.EventBus";
        String restPath = this.gatewayProperties.getRest().getRestPath();
        Context orCreateContext = this.vertx.getOrCreateContext();
        this.scheduler = Schedulers.fromExecutor(runnable -> {
            orCreateContext.runOnContext(r3 -> {
                runnable.run();
            });
        });
        this.httpServer = this.vertx.createHttpServer();
        Router router = Router.router(this.vertx);
        router.route("/*").handler(StaticHandler.create());
        Route route = router.route(HttpMethod.POST, restPath + "/*");
        route.handler(routingContext -> {
            routingContext.request().pause();
            routingContext.next();
        });
        route.handler(BasicAuthHandler.create(this.authProvider));
        route.handler(routingContext2 -> {
            routingContext2.request().resume();
            routingContext2.next();
        });
        route.handler(BodyHandler.create().setBodyLimit(this.gatewayProperties.getRest().getBodyLimitSize()));
        route.handler(routingContext3 -> {
            String uuid = UUID.randomUUID().toString();
            try {
                RoutingContextEventAdapter routingContextEventAdapter = new RoutingContextEventAdapter(restPath, routingContext3);
                routingContextEventAdapter.metadata().put("reply-to", str + "/replyHandler");
                routingContextEventAdapter.metadata().put("__correlation-id", uuid);
                this.responseCorrelationMap.put(uuid, routingContext3);
                this.eventService.sendWithAck(routingContextEventAdapter).publishOn(this.scheduler).subscribe((Consumer) null, th -> {
                    routingContext3.response().setStatusCode(500);
                    routingContext3.response().setStatusMessage(th.getMessage());
                    routingContext3.response().end();
                });
            } catch (Exception e) {
                this.responseCorrelationMap.remove(uuid);
                routingContext3.response().setStatusCode(500);
                routingContext3.response().setStatusMessage(e.getMessage());
                routingContext3.response().end();
            }
        });
        Mono publishOn = this.eventService.listenWithAck(str).publishOn(this.scheduler);
        Consumer consumer = flux -> {
            this.disposable = flux.publishOn(this.scheduler).subscribe(this::processResponseEvent, th -> {
                log.error("Event listener error", th);
            }, () -> {
                log.error("Should not happen! Event listener stopped for some reason!!");
            });
            this.httpServer.requestHandler(router).listen(this.gatewayProperties.getRest().getPort(), asyncResult -> {
                if (asyncResult.succeeded()) {
                    log.info("REST Server Listening on port " + ((HttpServer) asyncResult.result()).actualPort());
                    future.complete();
                } else {
                    log.error("Error starting REST Server", asyncResult.cause());
                    future.fail(asyncResult.cause());
                }
            });
        };
        Objects.requireNonNull(future);
        publishOn.subscribe(consumer, future::fail);
    }

    private void processResponseEvent(Event<byte[]> event) {
        String str = event.metadata().get("__correlation-id");
        if (str == null) {
            log.error("Received RPC response that does not contain a __correlation-id header");
            return;
        }
        RoutingContext routingContext = this.responseCorrelationMap.get(str);
        if (routingContext == null) {
            log.error("Received RPC response for __correlation-id: " + str + " but no context is set");
            return;
        }
        String str2 = event.metadata().get("error");
        if (str2 == null) {
            routingContext.response().setStatusCode(200);
        } else {
            routingContext.response().setStatusCode(500);
        }
        if (event.data() != null) {
            routingContext.response().putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(((byte[]) event.data()).length));
            if (event.metadata().contains("content-type")) {
                routingContext.response().putHeader(HttpHeaders.CONTENT_TYPE, event.metadata().get("content-type"));
            }
            routingContext.response().write(Buffer.buffer((byte[]) event.data()));
        } else if (str2 != null) {
            routingContext.response().putHeader(HttpHeaders.CONTENT_LENGTH, String.valueOf(str2.length()));
            routingContext.response().write(Buffer.buffer(str2));
        }
        routingContext.response().end();
    }

    public void stop() throws Exception {
        this.httpServer.close();
        this.disposable.dispose();
    }
}
