/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.mantis.examples.sinefunction;

import io.mantisrx.mantis.examples.sinefunction.core.Point;
import io.mantisrx.mantis.examples.sinefunction.stages.SinePointGeneratorStage;
import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.Job;
import io.mantisrx.runtime.MantisJob;
import io.mantisrx.runtime.MantisJobProvider;
import io.mantisrx.runtime.Metadata;
import io.mantisrx.runtime.ScalarToScalar;
import io.mantisrx.runtime.codec.JacksonCodecs;
import io.mantisrx.runtime.computation.ScalarComputation;
import io.mantisrx.runtime.executor.LocalJobExecutorNetworked;
import io.mantisrx.runtime.parameter.Parameter;
import io.mantisrx.runtime.parameter.type.BooleanParameter;
import io.mantisrx.runtime.parameter.type.DoubleParameter;
import io.mantisrx.runtime.parameter.type.IntParameter;
import io.mantisrx.runtime.parameter.validator.Validators;
import io.mantisrx.runtime.sink.SelfDocumentingSink;
import io.mantisrx.runtime.sink.ServerSentEventsSink;
import io.mantisrx.runtime.sink.predicate.Predicate;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.functions.Func1;
import rx.schedulers.Schedulers;

public class SineFunctionJob
extends MantisJobProvider<Point> {
    public static final String INTERVAL_SEC = "intervalSec";
    public static final String RANGE_MAX = "max";
    public static final String RANGE_MIN = "min";
    public static final String AMPLITUDE = "amplitude";
    public static final String FREQUENCY = "frequency";
    public static final String PHASE = "phase";
    public static final String RANDOM_RATE = "randomRate";
    public static final String USE_RANDOM_FLAG = "useRandom";
    public static final SelfDocumentingSink<Point> sseSink = new ServerSentEventsSink.Builder().withEncoder(point -> String.format("{\"x\": %f, \"y\": %f}", point.getX(), point.getY())).withPredicate(new Predicate("filter=even, returns even x parameters; filter=odd, returns odd x parameters.", parameters -> {
        Func1 filter = point -> true;
        if (parameters != null && parameters.containsKey("filter")) {
            String filterBy = (String)((List)parameters.get("filter")).get(0);
            filter = point -> {
                if ("even".equalsIgnoreCase(filterBy)) {
                    return point.getX() % 2.0 == 0.0;
                }
                if ("odd".equalsIgnoreCase(filterBy)) {
                    return point.getX() % 2.0 != 0.0;
                }
                return true;
            };
        }
        return filter;
    })).build();

    static ScalarToScalar.Config<Integer, Point> stageConfig() {
        return new ScalarToScalar.Config().codec(JacksonCodecs.pojo(Point.class));
    }

    public static void main(String[] args) {
        LocalJobExecutorNetworked.execute(new SineFunctionJob().getJobInstance(), (Parameter[])new Parameter[]{new Parameter(USE_RANDOM_FLAG, "false")});
    }

    public Job<Point> getJobInstance() {
        return MantisJob.source((Source)new TimerSource()).stage((ScalarComputation)new SinePointGeneratorStage(), SineFunctionJob.stageConfig()).sink(sseSink).parameterDefinition(new BooleanParameter().name(USE_RANDOM_FLAG).required().description("If true, produce a random sequence of integers.  If false, produce a sequence of integers starting at 0 and increasing by 1.").build()).parameterDefinition(new DoubleParameter().name(RANDOM_RATE).defaultValue((Object)1.0).description("The chance a random integer is generated, for the given period").validator(Validators.range((Number)0, (Number)1)).build()).parameterDefinition(new IntParameter().name(INTERVAL_SEC).defaultValue((Object)1).description("Period at which to generate a random integer value to send to sine function").validator(Validators.range((Number)1, (Number)60)).build()).parameterDefinition(new IntParameter().name(RANGE_MIN).defaultValue((Object)0).description("Minimun of random integer value").validator(Validators.range((Number)0, (Number)100)).build()).parameterDefinition(new IntParameter().name(RANGE_MAX).defaultValue((Object)100).description("Maximum of random integer value").validator(Validators.range((Number)1, (Number)100)).build()).parameterDefinition(new DoubleParameter().name(AMPLITUDE).defaultValue((Object)10.0).description("Amplitude for sine function").validator(Validators.range((Number)1, (Number)100)).build()).parameterDefinition(new DoubleParameter().name(FREQUENCY).defaultValue((Object)1.0).description("Frequency for sine function").validator(Validators.range((Number)1, (Number)100)).build()).parameterDefinition(new DoubleParameter().name(PHASE).defaultValue((Object)0.0).description("Phase for sine function").validator(Validators.range((Number)0, (Number)100)).build()).metadata(new Metadata.Builder().name("Sine function").description("Produces an infinite stream of points, along the sine function, using the following function definition: f(x) = amplitude * sin(frequency * x + phase). The input to the function is either random between [min, max], or an integer sequence starting  at 0.  The output is served via HTTP server using SSE protocol.").build()).create();
    }

    static class TimerSource
    implements Source<Integer> {
        TimerSource() {
        }

        public Observable<Observable<Integer>> call(Context context, Index index) {
            index.getTotalNumWorkersObservable().subscribeOn(Schedulers.io()).subscribe(workerCount -> System.out.println("Total worker count changed to -> " + workerCount));
            int period = (Integer)context.getParameters().get(SineFunctionJob.INTERVAL_SEC);
            int max = (Integer)context.getParameters().get(SineFunctionJob.RANGE_MAX);
            int min = (Integer)context.getParameters().get(SineFunctionJob.RANGE_MIN);
            double randomRate = (Double)context.getParameters().get(SineFunctionJob.RANDOM_RATE);
            boolean useRandom = (Boolean)context.getParameters().get(SineFunctionJob.USE_RANDOM_FLAG);
            Random randomNumGenerator = new Random();
            Random randomRateVariable = new Random();
            return Observable.just((Object)Observable.interval((long)0L, (long)period, (TimeUnit)TimeUnit.SECONDS).map(time -> {
                System.out.println("total worker num: " + index.getTotalNumWorkers());
                if (useRandom) {
                    return randomNumGenerator.nextInt(max - min + 1) + min;
                }
                return (int)time.longValue();
            }).filter(x -> {
                double value = randomRateVariable.nextDouble();
                return value <= randomRate;
            }));
        }

        public void close() throws IOException {
        }
    }
}

