/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.kmeans;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.CrossFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.CrossOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.test.recordJobs.kmeans.udfs.ComputeDistance;
import eu.stratosphere.test.recordJobs.kmeans.udfs.FindNearestCenter;
import eu.stratosphere.test.recordJobs.kmeans.udfs.PointInFormat;
import eu.stratosphere.test.recordJobs.kmeans.udfs.PointOutFormat;
import eu.stratosphere.test.recordJobs.kmeans.udfs.RecomputeClusterCenter;
import eu.stratosphere.types.IntValue;
import java.util.ArrayList;

public class KMeansCross
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataPointInput = args.length > 1 ? args[1] : "";
        String clusterInput = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        int numIterations = args.length > 4 ? Integer.parseInt(args[4]) : 1;
        FileDataSource initialClusterPoints = new FileDataSource((FileInputFormat)new PointInFormat(), clusterInput, "Centers");
        initialClusterPoints.setDegreeOfParallelism(1);
        BulkIteration iteration = new BulkIteration("K-Means Loop");
        iteration.setInput((Operator)initialClusterPoints);
        iteration.setMaximumNumberOfIterations(numIterations);
        FileDataSource dataPoints = new FileDataSource((FileInputFormat)new PointInFormat(), dataPointInput, "Data Points");
        CrossOperator computeDistance = CrossOperator.builder((CrossFunction)new ComputeDistance()).input1(new Operator[]{dataPoints}).input2(new Operator[]{iteration.getPartialSolution()}).name("Compute Distances").build();
        ReduceOperator findNearestClusterCenters = ReduceOperator.builder((ReduceFunction)new FindNearestCenter(), IntValue.class, (int)0).input(new Operator[]{computeDistance}).name("Find Nearest Centers").build();
        ReduceOperator recomputeClusterCenter = ReduceOperator.builder((ReduceFunction)new RecomputeClusterCenter(), IntValue.class, (int)0).input(new Operator[]{findNearestClusterCenters}).name("Recompute Center Positions").build();
        iteration.setNextPartialSolution((Operator)recomputeClusterCenter);
        FileDataSource dataPoints2 = new FileDataSource((FileInputFormat)new PointInFormat(), dataPointInput, "Data Points 2");
        CrossOperator computeFinalDistance = CrossOperator.builder((CrossFunction)new ComputeDistance()).input1(new Operator[]{dataPoints2}).input2(new Operator[]{iteration}).name("Compute Final Distances").build();
        ReduceOperator findNearestFinalCluster = ReduceOperator.builder((ReduceFunction)new FindNearestCenter(), IntValue.class, (int)0).input(new Operator[]{computeFinalDistance}).name("Find Nearest Final Centers").build();
        FileDataSink finalClusters = new FileDataSink((FileOutputFormat)new PointOutFormat(), output + "/centers", (Operator)iteration, "Cluster Positions");
        FileDataSink clusterAssignments = new FileDataSink((FileOutputFormat)new PointOutFormat(), output + "/points", (Operator)findNearestFinalCluster, "Cluster Assignments");
        ArrayList<FileDataSink> sinks = new ArrayList<FileDataSink>();
        sinks.add(finalClusters);
        sinks.add(clusterAssignments);
        Plan plan = new Plan(sinks, "Iterative KMeans");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output> <numIterations>";
    }

    public static void main(String[] args) throws Exception {
        KMeansCross kmi = new KMeansCross();
        if (args.length < 5) {
            System.err.println(kmi.getDescription());
            System.exit(1);
        }
        Plan plan = kmi.getPlan(args);
        LocalExecutor.execute((Plan)plan);
    }
}

