/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.example.java.record.triangles;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.example.java.record.triangles.io.EdgeInputFormat;
import eu.stratosphere.example.java.record.triangles.io.EdgeWithDegreesOutputFormat;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;

public class ComputeEdgeDegrees
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String edgeInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        int delimiter = args.length > 3 ? (int)Integer.parseInt(args[3]) : 44;
        FileDataSource edges = new FileDataSource((FileInputFormat)new EdgeInputFormat(), edgeInput, "Input Edges");
        edges.setParameter("edgeinput.delimiter", delimiter);
        MapOperator projectEdge = MapOperator.builder((MapFunction)new ProjectEdge()).input(new Operator[]{edges}).name("Project Edge").build();
        ReduceOperator edgeCounter = ReduceOperator.builder((ReduceFunction)new CountEdges(), IntValue.class, (int)0).input(new Operator[]{projectEdge}).name("Count Edges for Vertex").build();
        ReduceOperator countJoiner = ReduceOperator.builder((ReduceFunction)new JoinCountsAndUniquify()).keyField(IntValue.class, 0).keyField(IntValue.class, 1).input(new Operator[]{edgeCounter}).name("Join Counts").build();
        FileDataSink triangles = new FileDataSink((FileOutputFormat)new EdgeWithDegreesOutputFormat(), output, (Operator)countJoiner, "Unique Edges With Degrees");
        Plan p = new Plan((GenericDataSink)triangles, "Normalize Edges and compute Vertex Degrees");
        p.setDefaultParallelism(numSubTasks);
        return p;
    }

    public String getDescription() {
        return "Parameters: [noSubStasks] [input file] [output file] [vertex delimiter]";
    }

    public static final class JoinCountsAndUniquify
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final IntValue count1 = new IntValue();
        private final IntValue count2 = new IntValue();

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            Record rec = null;
            int c1 = 0;
            int c2 = 0;
            int numValues = 0;
            while (records.hasNext()) {
                rec = records.next();
                int f1 = ((IntValue)rec.getField(2, IntValue.class)).getValue();
                int f2 = ((IntValue)rec.getField(3, IntValue.class)).getValue();
                c1 += f1;
                c2 += f2;
                ++numValues;
            }
            if (numValues != 2 || c1 == 0 || c2 == 0) {
                throw new RuntimeException("JoinCountsAndUniquify Problem: key1=" + ((IntValue)rec.getField(0, IntValue.class)).getValue() + ", key2=" + ((IntValue)rec.getField(1, IntValue.class)).getValue() + "values=" + numValues + ", c1=" + c1 + ", c2=" + c2);
            }
            this.count1.setValue(c1);
            this.count2.setValue(c2);
            rec.setField(2, (Value)this.count1);
            rec.setField(3, (Value)this.count2);
            out.collect((Object)rec);
        }
    }

    public static final class CountEdges
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record result = new Record();
        private final IntValue firstVertex = new IntValue();
        private final IntValue secondVertex = new IntValue();
        private final IntValue firstCount = new IntValue();
        private final IntValue secondCount = new IntValue();
        private int[] vals = new int[1024];

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            int[] vals = this.vals;
            int len = 0;
            int key = -1;
            while (records.hasNext()) {
                Record rec = records.next();
                int id = ((IntValue)rec.getField(1, IntValue.class)).getValue();
                if (key == -1) {
                    key = ((IntValue)rec.getField(0, IntValue.class)).getValue();
                }
                if (len >= vals.length) {
                    vals = new int[vals.length * 2];
                    System.arraycopy(this.vals, 0, vals, 0, this.vals.length);
                    this.vals = vals;
                }
                vals[len++] = id;
            }
            Arrays.sort(vals, 0, len);
            int k = 0;
            int curr = -1;
            for (int i = 0; i < len; ++i) {
                int val = vals[i];
                if (val != curr) {
                    curr = val;
                    vals[k] = vals[i];
                    ++k;
                    continue;
                }
                vals[k] = vals[i];
            }
            len = k;
            for (int i = 0; i < len; ++i) {
                int e2 = vals[i];
                if (key <= e2) {
                    this.firstVertex.setValue(key);
                    this.secondVertex.setValue(e2);
                    this.firstCount.setValue(len);
                    this.secondCount.setValue(0);
                } else {
                    this.firstVertex.setValue(e2);
                    this.secondVertex.setValue(key);
                    this.firstCount.setValue(0);
                    this.secondCount.setValue(len);
                }
                this.result.setField(0, (Value)this.firstVertex);
                this.result.setField(1, (Value)this.secondVertex);
                this.result.setField(2, (Value)this.firstCount);
                this.result.setField(3, (Value)this.secondCount);
                out.collect((Object)this.result);
            }
        }
    }

    public static final class ProjectEdge
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record copy = new Record();

        public void map(Record record, Collector<Record> out) throws Exception {
            this.copy.setField(0, record.getField(1, IntValue.class));
            this.copy.setField(1, record.getField(0, IntValue.class));
            out.collect((Object)this.copy);
            out.collect((Object)record);
        }
    }
}

