/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.mantis.samples.stage;

import com.netflix.mantis.samples.proto.RequestAggregation;
import com.netflix.mantis.samples.proto.RequestEvent;
import io.mantisrx.common.MantisGroup;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.GroupToScalar;
import io.mantisrx.runtime.computation.GroupToScalarComputation;
import io.mantisrx.runtime.parameter.ParameterDefinition;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.observables.GroupedObservable;

public class AggregationStage
implements GroupToScalarComputation<String, RequestEvent, RequestAggregation> {
    private static final Logger log = LoggerFactory.getLogger(AggregationStage.class);
    public static final String AGGREGATION_DURATION_MSEC_PARAM = "AggregationDurationMsec";
    int aggregationDurationMsec;

    private Observable<? extends RequestAggregation> aggregate(GroupedObservable<String, MantisGroup<String, RequestEvent>> go) {
        return go.reduce((Object)RequestAggregation.builder().build(), (accumulator, value) -> {
            accumulator.setCount(accumulator.getCount() + ((RequestEvent)value.getValue()).getLatency());
            accumulator.setPath((String)go.getKey());
            return accumulator;
        }).doOnNext(aggregate -> log.debug("Generated aggregate {}", aggregate));
    }

    public Observable<RequestAggregation> call(Context context, Observable<MantisGroup<String, RequestEvent>> mantisGroupO) {
        return mantisGroupO.window((long)this.aggregationDurationMsec, TimeUnit.MILLISECONDS).flatMap(omg -> omg.groupBy(MantisGroup::getKeyValue).flatMap(this::aggregate));
    }

    public void init(Context context) {
        this.aggregationDurationMsec = (Integer)context.getParameters().get(AGGREGATION_DURATION_MSEC_PARAM, (Object)1000);
    }

    public static GroupToScalar.Config<String, RequestEvent, RequestAggregation> config() {
        return new GroupToScalar.Config().description("sum events for a path").codec(RequestAggregation.requestAggregationCodec()).withParameters(AggregationStage.getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        ArrayList params = new ArrayList();
        params.add(new IntParameter().name(AGGREGATION_DURATION_MSEC_PARAM).description("window size for aggregation").validator(Validators.range((Number)100, (Number)10000)).defaultValue((Object)5000).build());
        return params;
    }
}

