/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.example.java.record.kmeans;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.CrossFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.CrossOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.example.java.record.kmeans.udfs.ComputeDistance;
import eu.stratosphere.example.java.record.kmeans.udfs.FindNearestCenter;
import eu.stratosphere.example.java.record.kmeans.udfs.PointInFormat;
import eu.stratosphere.example.java.record.kmeans.udfs.PointOutFormat;
import eu.stratosphere.example.java.record.kmeans.udfs.RecomputeClusterCenter;
import eu.stratosphere.types.IntValue;

public class KMeansSingleStep
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataPointInput = args.length > 1 ? args[1] : "";
        String clusterInput = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        FileDataSource dataPoints = new FileDataSource((FileInputFormat)new PointInFormat(), dataPointInput, "Data Points");
        dataPoints.getCompilerHints().addUniqueField(0);
        FileDataSource clusterPoints = new FileDataSource((FileInputFormat)new PointInFormat(), clusterInput, "Centers");
        clusterPoints.setDegreeOfParallelism(1);
        clusterPoints.getCompilerHints().addUniqueField(0);
        CrossOperator computeDistance = CrossOperator.builder((CrossFunction)new ComputeDistance()).input1(new Operator[]{dataPoints}).input2(new Operator[]{clusterPoints}).name("Compute Distances").build();
        ReduceOperator findNearestClusterCenters = ReduceOperator.builder((ReduceFunction)new FindNearestCenter(), IntValue.class, (int)0).input(new Operator[]{computeDistance}).name("Find Nearest Centers").build();
        ReduceOperator recomputeClusterCenter = ReduceOperator.builder((ReduceFunction)new RecomputeClusterCenter(), IntValue.class, (int)0).input(new Operator[]{findNearestClusterCenters}).name("Recompute Center Positions").build();
        FileDataSink newClusterPoints = new FileDataSink((FileOutputFormat)new PointOutFormat(), output, (Operator)recomputeClusterCenter, "New Center Positions");
        Plan plan = new Plan((GenericDataSink)newClusterPoints, "KMeans Iteration");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [dataPoints] [clusterCenters] [output]";
    }
}

