/*
 * Decompiled with CFR 0.152.
 */
package org.apache.spark.examples.mllib;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.mllib.stat.test.BinarySample;
import org.apache.spark.mllib.stat.test.StreamingTest;
import org.apache.spark.mllib.stat.test.StreamingTestResult;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.util.Utils;

public class JavaStreamingTestExample {
    private static int timeoutCounter = 0;

    public static void main(String[] args) throws Exception {
        if (args.length != 3) {
            System.err.println("Usage: JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>");
            System.exit(1);
        }
        String dataDir = args[0];
        Duration batchDuration = Seconds.apply((long)Long.parseLong(args[1]));
        int numBatchesTimeout = Integer.parseInt(args[2]);
        SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
        ssc.checkpoint(Utils.createTempDir((String)System.getProperty("java.io.tmpdir"), (String)"spark").toString());
        JavaDStream data = ssc.textFileStream(dataDir).map((Function)new Function<String, BinarySample>(){

            public BinarySample call(String line) {
                String[] ts = line.split(",");
                boolean label = Boolean.parseBoolean(ts[0]);
                double value = Double.parseDouble(ts[1]);
                return new BinarySample(label, value);
            }
        });
        StreamingTest streamingTest = new StreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod("welch");
        JavaDStream out = streamingTest.registerStream(data);
        out.print();
        timeoutCounter = numBatchesTimeout;
        out.foreachRDD((VoidFunction)new VoidFunction<JavaRDD<StreamingTestResult>>(){

            public void call(JavaRDD<StreamingTestResult> rdd) {
                boolean anySignificant;
                timeoutCounter = timeoutCounter - 1;
                boolean bl = anySignificant = !rdd.filter((Function)new Function<StreamingTestResult, Boolean>(){

                    public Boolean call(StreamingTestResult v) {
                        return v.pValue() < 0.05;
                    }
                }).isEmpty();
                if (timeoutCounter <= 0 || anySignificant) {
                    rdd.context().stop();
                }
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }
}

