/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.util.testjar;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.RemoteEnvironment;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.operators.MapOperator;
import eu.stratosphere.api.java.operators.SingleInputUdfOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.configuration.Configuration;
import java.io.Serializable;
import java.util.Collection;

public class KMeansForTest
implements Program {
    private static boolean fileOutput = false;
    private static String pointsPath = null;
    private static String centersPath = null;
    private static String outputPath = null;
    private static int numIterations = 10;

    public Plan getPlan(String ... args) {
        if (!KMeansForTest.parseParameters(args)) {
            throw new RuntimeException("Unable to parse the arguments");
        }
        RemoteEnvironment env = new RemoteEnvironment("localhost", 1, null);
        DataSet<Point> points = KMeansForTest.getPointDataSet((ExecutionEnvironment)env);
        DataSet<Centroid> centroids = KMeansForTest.getCentroidDataSet((ExecutionEnvironment)env);
        IterativeDataSet loop = centroids.iterate(numIterations);
        MapOperator newCentroids = ((MapOperator)points.map((MapFunction)new SelectNearestCenter()).withBroadcastSet((DataSet)loop, "centroids")).map((MapFunction)new CountAppender()).groupBy(new int[]{0}).reduce((ReduceFunction)new CentroidAccumulator()).map((MapFunction)new CentroidAverager());
        DataSet finalCentroids = loop.closeWith((DataSet)newCentroids);
        SingleInputUdfOperator clusteredPoints = points.map((MapFunction)new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
        if (fileOutput) {
            clusteredPoints.writeAsCsv(outputPath, "\n", " ");
        } else {
            clusteredPoints.print();
        }
        return env.createProgramPlan();
    }

    /*
     * Enabled aggressive block sorting
     */
    private static boolean parseParameters(String[] programArguments) {
        if (programArguments.length <= 0) {
            System.out.println("Executing K-Means example with default parameters and built-in default data.");
            System.out.println("  Provide parameters to read input data from files.");
            System.out.println("  See the documentation for the correct format of input files.");
            System.out.println("  We provide a data generator to create synthetic input files for this program.");
            System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num iterations>");
            return true;
        }
        fileOutput = true;
        if (programArguments.length == 4) {
            pointsPath = programArguments[0];
            centersPath = programArguments[1];
            outputPath = programArguments[2];
            numIterations = Integer.parseInt(programArguments[3]);
            return true;
        }
        System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
        return false;
    }

    private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(pointsPath).fieldDelimiter('|').includeFields(new boolean[]{true, true}).types(Double.class, Double.class).map((MapFunction)new TuplePointConverter());
        }
        throw new UnsupportedOperationException("Use file output");
    }

    private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(centersPath).fieldDelimiter('|').includeFields(new boolean[]{true, true, true}).types(Integer.class, Double.class, Double.class).map((MapFunction)new TupleCentroidConverter());
        }
        throw new UnsupportedOperationException("Use file output");
    }

    public static final class CentroidAverager
    extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
        public Centroid map(Tuple3<Integer, Point, Long> value) {
            return new Centroid((Integer)value.f0, ((Point)value.f1).div((Long)value.f2));
        }
    }

    public static final class CentroidAccumulator
    extends ReduceFunction<Tuple3<Integer, Point, Long>> {
        public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
            return new Tuple3(val1.f0, (Object)((Point)val1.f1).add((Point)val2.f1), (Object)((Long)val1.f2 + (Long)val2.f2));
        }
    }

    public static final class CountAppender
    extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
        public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
            return new Tuple3(t.f0, t.f1, (Object)1L);
        }
    }

    public static final class SelectNearestCenter
    extends MapFunction<Point, Tuple2<Integer, Point>> {
        private Collection<Centroid> centroids;

        public void open(Configuration parameters) throws Exception {
            this.centroids = this.getRuntimeContext().getBroadcastVariable("centroids");
        }

        public Tuple2<Integer, Point> map(Point p) throws Exception {
            double minDistance = Double.MAX_VALUE;
            int closestCentroidId = -1;
            for (Centroid centroid : this.centroids) {
                double distance = p.euclideanDistance(centroid);
                if (!(distance < minDistance)) continue;
                minDistance = distance;
                closestCentroidId = centroid.id;
            }
            return new Tuple2((Object)closestCentroidId, (Object)p);
        }
    }

    public static final class TupleCentroidConverter
    extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
        public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
            return new Centroid((Integer)t.f0, (Double)t.f1, (Double)t.f2);
        }
    }

    public static final class TuplePointConverter
    extends MapFunction<Tuple2<Double, Double>, Point> {
        public Point map(Tuple2<Double, Double> t) throws Exception {
            return new Point((Double)t.f0, (Double)t.f1);
        }
    }

    public static class Centroid
    extends Point {
        public int id;

        public Centroid() {
        }

        public Centroid(int id, double x, double y) {
            super(x, y);
            this.id = id;
        }

        public Centroid(int id, Point p) {
            super(p.x, p.y);
            this.id = id;
        }

        @Override
        public String toString() {
            return this.id + " " + super.toString();
        }
    }

    public static class Point
    implements Serializable {
        public double x;
        public double y;

        public Point() {
        }

        public Point(double x, double y) {
            this.x = x;
            this.y = y;
        }

        public Point add(Point other) {
            this.x += other.x;
            this.y += other.y;
            return this;
        }

        public Point div(long val) {
            this.x /= (double)val;
            this.y /= (double)val;
            return this;
        }

        public double euclideanDistance(Point other) {
            return Math.sqrt((this.x - other.x) * (this.x - other.x) + (this.y - other.y) * (this.y - other.y));
        }

        public void clear() {
            this.y = 0.0;
            this.x = 0.0;
        }

        public String toString() {
            return this.x + " " + this.y;
        }
    }
}

