/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.kmeans;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.FileOutputFormat;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;

public class KMeansSingleStep
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataPointInput = args.length > 1 ? args[1] : "";
        String clusterInput = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        FileDataSource pointsSource = new FileDataSource((FileInputFormat)new CsvInputFormat('|', new Class[]{IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class}), dataPointInput, "Data Points");
        FileDataSource clustersSource = new FileDataSource((FileInputFormat)new CsvInputFormat('|', new Class[]{IntValue.class, DoubleValue.class, DoubleValue.class, DoubleValue.class}), clusterInput, "Centers");
        MapOperator dataPoints = MapOperator.builder((MapFunction)new PointBuilder()).name("Build data points").input(new Operator[]{pointsSource}).build();
        MapOperator clusterPoints = MapOperator.builder((MapFunction)new PointBuilder()).name("Build cluster points").input(new Operator[]{clustersSource}).build();
        MapOperator findNearestClusterCenters = MapOperator.builder((MapFunction)new SelectNearestCenter()).setBroadcastVariable("centers", (Operator)clusterPoints).input(new Operator[]{dataPoints}).name("Find Nearest Centers").build();
        ReduceOperator recomputeClusterCenter = ReduceOperator.builder((ReduceFunction)new RecomputeClusterCenter(), IntValue.class, (int)0).input(new Operator[]{findNearestClusterCenters}).name("Recompute Center Positions").build();
        FileDataSink newClusterPoints = new FileDataSink((eu.stratosphere.api.common.io.FileOutputFormat)new PointOutFormat(), output, (Operator)recomputeClusterCenter, "New Center Positions");
        Plan plan = new Plan((GenericDataSink)newClusterPoints, "KMeans Iteration");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: <numSubStasks> <dataPoints> <clusterCenters> <output>";
    }

    public static final class PointOutFormat
    extends FileOutputFormat {
        private static final long serialVersionUID = 1L;
        private static final String format = "%d|%.1f|%.1f|%.1f|\n";

        public void writeRecord(Record record) throws IOException {
            int id = ((IntValue)record.getField(0, IntValue.class)).getValue();
            Point p = (Point)record.getField(1, Point.class);
            byte[] bytes = String.format(format, id, p.x, p.y, p.z).getBytes();
            this.stream.write(bytes);
        }
    }

    public static final class PointBuilder
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> out) throws Exception {
            double x = ((DoubleValue)record.getField(1, DoubleValue.class)).getValue();
            double y = ((DoubleValue)record.getField(2, DoubleValue.class)).getValue();
            double z = ((DoubleValue)record.getField(3, DoubleValue.class)).getValue();
            record.setField(1, (Value)new Point(x, y, z));
            out.collect((Object)record);
        }
    }

    @ReduceOperator.Combinable
    public static final class RecomputeClusterCenter
    extends ReduceFunction {
        private static final long serialVersionUID = 1L;
        private final Point p = new Point();

        public void reduce(Iterator<Record> points, Collector<Record> out) {
            Record sum = this.sumPointsAndCount(points);
            sum.setField(1, (Value)((Point)sum.getField(1, Point.class)).div(((IntValue)sum.getField(2, IntValue.class)).getValue()));
            out.collect((Object)sum);
        }

        public void combine(Iterator<Record> points, Collector<Record> out) {
            out.collect((Object)this.sumPointsAndCount(points));
        }

        private final Record sumPointsAndCount(Iterator<Record> dataPoints) {
            Record next = null;
            this.p.clear();
            int count = 0;
            while (dataPoints.hasNext()) {
                next = dataPoints.next();
                this.p.add((Point)next.getField(1, Point.class));
                count += ((IntValue)next.getField(2, IntValue.class)).getValue();
            }
            next.setField(1, (Value)this.p);
            next.setField(2, (Value)new IntValue(count));
            return next;
        }
    }

    public static final class SelectNearestCenter
    extends MapFunction {
        private static final long serialVersionUID = 1L;
        private final IntValue one = new IntValue(1);
        private final Record result = new Record(3);
        private List<PointWithId> centers = new ArrayList<PointWithId>();

        public void open(Configuration parameters) throws Exception {
            Collection clusterCenters = this.getRuntimeContext().getBroadcastVariable("centers");
            this.centers.clear();
            for (Record r : clusterCenters) {
                this.centers.add(new PointWithId(((IntValue)r.getField(0, IntValue.class)).getValue(), (Point)r.getField(1, Point.class)));
            }
        }

        public void map(Record dataPointRecord, Collector<Record> out) {
            Point p = (Point)dataPointRecord.getField(1, Point.class);
            double nearestDistance = Double.MAX_VALUE;
            int centerId = -1;
            for (PointWithId center : this.centers) {
                double distance = p.euclideanDistance(center.point);
                if (!(distance < nearestDistance)) continue;
                nearestDistance = distance;
                centerId = center.id;
            }
            this.result.setField(0, (Value)new IntValue(centerId));
            this.result.setField(1, (Value)p);
            this.result.setField(2, (Value)this.one);
            out.collect((Object)this.result);
        }
    }

    public static final class PointWithId {
        public int id;
        public Point point;

        public PointWithId(int id, Point p) {
            this.id = id;
            this.point = p;
        }
    }

    public static final class Point
    implements Value {
        private static final long serialVersionUID = 1L;
        public double x;
        public double y;
        public double z;

        public Point() {
        }

        public Point(double x, double y, double z) {
            this.x = x;
            this.y = y;
            this.z = z;
        }

        public void add(Point other) {
            this.x += other.x;
            this.y += other.y;
            this.z += other.z;
        }

        public Point div(long val) {
            this.x /= (double)val;
            this.y /= (double)val;
            this.z /= (double)val;
            return this;
        }

        public double euclideanDistance(Point other) {
            return Math.sqrt((this.x - other.x) * (this.x - other.x) + (this.y - other.y) * (this.y - other.y) + (this.z - other.z) * (this.z - other.z));
        }

        public void clear() {
            this.z = 0.0;
            this.y = 0.0;
            this.x = 0.0;
        }

        public void write(DataOutput out) throws IOException {
            out.writeDouble(this.x);
            out.writeDouble(this.y);
            out.writeDouble(this.z);
        }

        public void read(DataInput in) throws IOException {
            this.x = in.readDouble();
            this.y = in.readDouble();
            this.z = in.readDouble();
        }

        public String toString() {
            return "(" + this.x + "|" + this.y + "|" + this.z + ")";
        }
    }
}

