/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.samples;

import com.netflix.mantis.samples.proto.AggregationReport;
import com.netflix.mantis.samples.proto.RequestAggregation;
import com.netflix.mantis.samples.proto.RequestEvent;
import com.netflix.mantis.samples.source.RandomRequestSource;
import io.mantisrx.runtime.Config;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.core.MantisStream;
import io.mantisrx.runtime.core.WindowSpec;
import io.mantisrx.runtime.core.functions.ReduceFunction;
import io.mantisrx.runtime.core.sinks.ObservableSinkImpl;
import io.mantisrx.runtime.core.sinks.SinkFunction;
import io.mantisrx.runtime.core.sources.ObservableSourceImpl;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.sink.Sinks;
import io.mantisrx.runtime.source.Source;
import io.mantisrx.shaded.com.fasterxml.jackson.core.JsonProcessingException;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RequestAggregationDslJob
extends MantisJobProvider<String> {
    private static final Logger log = LoggerFactory.getLogger(RequestAggregationDslJob.class);
    private static final ObjectMapper mapper = new ObjectMapper();

    public Job<String> getJobInstance() {
        String groupByParam = "path";
        Config jobConfig = MantisStream.create(null).source((SourceFunction)new ObservableSourceImpl((Source)new RandomRequestSource())).keyBy(x -> {
            if ("path".equalsIgnoreCase(groupByParam)) {
                return x.getRequestPath();
            }
            return x.getIpAddress();
        }).window(WindowSpec.timed((Duration)Duration.ofSeconds(5L))).reduce((ReduceFunction)new ReduceFunction<RequestEvent, RequestAggregation>(){

            public RequestAggregation initialValue() {
                return RequestAggregation.builder().build();
            }

            public RequestAggregation reduce(RequestAggregation acc, RequestEvent requestEvent) {
                return RequestAggregation.builder().path(requestEvent.getRequestPath()).count(acc.getCount() + requestEvent.getLatency()).build();
            }
        }).materialize().keyBy(x -> "").window(WindowSpec.timed((Duration)Duration.ofSeconds(5L))).reduce((ReduceFunction)new ReduceFunction<RequestAggregation, AggregationReport>(){

            public AggregationReport initialValue() {
                return new AggregationReport(new ConcurrentHashMap<String, Integer>());
            }

            public AggregationReport reduce(AggregationReport acc, RequestAggregation item) {
                if (item != null && item.getPath() != null) {
                    acc.getPathToCountMap().put(item.getPath(), item.getCount());
                }
                return acc;
            }
        }).map(report -> {
            try {
                return mapper.writeValueAsString(report);
            }
            catch (JsonProcessingException e) {
                log.error(e.getMessage());
                return null;
            }
        }).filter(Objects::nonNull).sink((SinkFunction)new ObservableSinkImpl(Sinks.sysout()));
        return jobConfig.metadata(new Metadata.Builder().name("GroupByPath").description("Connects to a random data generator source and counts the number of requests for each uri within a window").build()).create();
    }

    public static void main(String[] args) {
        LocalJobExecutorNetworked.execute(new RequestAggregationDslJob().getJobInstance(), (Parameter[])new Parameter[0]);
    }
}

