/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.vertx.web.runtime;

import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.runtime.SSEMulti;
import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.Json;
import io.vertx.ext.web.RoutingContext;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class MultiSseSupport {
    private MultiSseSupport() {
    }

    public static void subscribeString(Multi<String> multi, RoutingContext rc) {
        MultiSseSupport.subscribeBuffer(multi.map(new Function<String, Buffer>(){

            @Override
            public Buffer apply(String s) {
                return Buffer.buffer(s);
            }
        }), rc);
    }

    private static void initialize(HttpServerResponse response) {
        if (response.bytesWritten() == 0L) {
            MultiMap headers = response.headers();
            if (headers.get("content-type") == null) {
                headers.set("content-type", "text/event-stream");
            }
            response.setChunked(true);
        }
    }

    private static void onWriteDone(Subscription subscription, AsyncResult<Void> ar, RoutingContext rc) {
        if (ar.failed()) {
            rc.fail(ar.cause());
        } else {
            subscription.request(1L);
        }
    }

    public static void write(Multi<Buffer> multi, final RoutingContext rc) {
        final HttpServerResponse response = rc.response();
        multi.subscribe().withSubscriber(new Subscriber<Buffer>(){
            Subscription upstream;

            @Override
            public void onSubscribe(Subscription subscription) {
                this.upstream = subscription;
                this.upstream.request(1L);
            }

            @Override
            public void onNext(Buffer item) {
                MultiSseSupport.initialize(response);
                response.write(item, new Handler<AsyncResult<Void>>(){

                    @Override
                    public void handle(AsyncResult<Void> ar) {
                        MultiSseSupport.onWriteDone(upstream, ar, rc);
                    }
                });
            }

            @Override
            public void onError(Throwable throwable) {
                rc.fail(throwable);
            }

            @Override
            public void onComplete() {
                MultiSseSupport.endOfStream(response);
            }
        });
    }

    public static void subscribeBuffer(Multi<Buffer> multi, final RoutingContext rc) {
        final HttpServerResponse response = rc.response();
        multi.subscribe().withSubscriber(new Subscriber<Buffer>(){
            Subscription upstream;
            final AtomicLong count = new AtomicLong();

            @Override
            public void onSubscribe(Subscription subscription) {
                this.upstream = subscription;
                this.upstream.request(1L);
            }

            @Override
            public void onNext(Buffer item) {
                MultiSseSupport.initialize(response);
                Buffer buffer = Buffer.buffer("data: ").appendBuffer(item).appendString("\n").appendString("id: " + this.count.getAndIncrement()).appendString("\n\n");
                response.write(buffer, new Handler<AsyncResult<Void>>(){

                    @Override
                    public void handle(AsyncResult<Void> ar) {
                        MultiSseSupport.onWriteDone(upstream, ar, rc);
                    }
                });
            }

            @Override
            public void onError(Throwable throwable) {
                rc.fail(throwable);
            }

            @Override
            public void onComplete() {
                MultiSseSupport.endOfStream(response);
            }
        });
    }

    public static void subscribeMutinyBuffer(Multi<io.vertx.mutiny.core.buffer.Buffer> multi, RoutingContext rc) {
        MultiSseSupport.subscribeBuffer(multi.map(new Function<io.vertx.mutiny.core.buffer.Buffer, Buffer>(){

            @Override
            public Buffer apply(io.vertx.mutiny.core.buffer.Buffer b) {
                return b.getDelegate();
            }
        }), rc);
    }

    public static void subscribeRxBuffer(Multi<io.vertx.reactivex.core.buffer.Buffer> multi, RoutingContext rc) {
        MultiSseSupport.subscribeBuffer(multi.map(new Function<io.vertx.reactivex.core.buffer.Buffer, Buffer>(){

            @Override
            public Buffer apply(io.vertx.reactivex.core.buffer.Buffer b) {
                return b.getDelegate();
            }
        }), rc);
    }

    public static void subscribeObject(Multi<Object> multi, RoutingContext rc) {
        final AtomicLong count = new AtomicLong();
        MultiSseSupport.write(multi.map(new Function<Object, Buffer>(){

            @Override
            public Buffer apply(Object o) {
                if (o instanceof ReactiveRoutes.ServerSentEvent) {
                    ReactiveRoutes.ServerSentEvent ev = (ReactiveRoutes.ServerSentEvent)o;
                    long id = ev.id() != -1L ? ev.id() : count.getAndIncrement();
                    String e = ev.event() == null ? "" : "event: " + ev.event() + "\n";
                    return Buffer.buffer(e + "data: " + Json.encodeToBuffer(ev.data()) + "\nid: " + id + "\n\n");
                }
                return Buffer.buffer("data: " + Json.encodeToBuffer(o) + "\nid: " + count.getAndIncrement() + "\n\n");
            }
        }), rc);
    }

    private static void endOfStream(HttpServerResponse response) {
        MultiMap headers;
        if (response.bytesWritten() == 0L && (headers = response.headers()).get("content-type") == null) {
            headers.set("content-type", "text/event-stream");
        }
        response.end();
    }

    public static boolean isSSE(Multi<?> multi) {
        return multi instanceof SSEMulti;
    }
}

