/*
 * Decompiled with CFR 0.152.
 */
package net.redpipe.example.kafka;

import io.vertx.core.json.JsonObject;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.Context;
import net.redpipe.engine.core.AppGlobals;
import rx.Observable;

@Path(value="/sse")
public class KafkaResource {
    @GET
    @Produces(value={"text/event-stream"})
    public Observable<JsonObject> index(@Context AppGlobals globals) {
        UUID uuid = UUID.randomUUID();
        Observable consumer = (Observable)globals.getGlobal("consumer");
        Observable ret = consumer.buffer(1L, TimeUnit.SECONDS).map(metrics -> {
            System.err.println("Metrics for " + uuid);
            JsonObject dashboard = new JsonObject();
            for (JsonObject metric : metrics) {
                dashboard.mergeIn(metric);
            }
            return dashboard;
        }).doOnUnsubscribe(() -> System.err.println("Unsub for " + uuid));
        return ret;
    }
}

