/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.graph;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.recordJobs.graph.pageRankUtil.DanglingPageRankInputFormat;
import eu.stratosphere.test.recordJobs.graph.pageRankUtil.ImprovedAdjacencyListInputFormat;
import eu.stratosphere.test.recordJobs.graph.pageRankUtil.LongArrayView;
import eu.stratosphere.test.recordJobs.graph.pageRankUtil.PageWithRankOutFormat;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;

public class SimplePageRank
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;
    private static final String NUM_VERTICES_CONFIG_PARAM = "pageRank.numVertices";

    public Plan getPlan(String ... args) {
        int dop = 1;
        String pageWithRankInputPath = "";
        String adjacencyListInputPath = "";
        String outputPath = "";
        int numIterations = 25;
        long numVertices = 5L;
        if (args.length >= 6) {
            dop = Integer.parseInt(args[0]);
            pageWithRankInputPath = args[1];
            adjacencyListInputPath = args[2];
            outputPath = args[3];
            numIterations = Integer.parseInt(args[4]);
            numVertices = Long.parseLong(args[5]);
        }
        FileDataSource pageWithRankInput = new FileDataSource((FileInputFormat)new DanglingPageRankInputFormat(), pageWithRankInputPath, "PageWithRank Input");
        pageWithRankInput.getParameters().setLong(NUM_VERTICES_CONFIG_PARAM, numVertices);
        BulkIteration iteration = new BulkIteration("Page Rank Loop");
        iteration.setInput((Operator)pageWithRankInput);
        FileDataSource adjacencyListInput = new FileDataSource((FileInputFormat)new ImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput");
        JoinOperator join = JoinOperator.builder((JoinFunction)new JoinVerexWithEdgesMatch(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getPartialSolution()}).input2(new Operator[]{adjacencyListInput}).name("Join with Edges").build();
        ReduceOperator rankAggregation = ReduceOperator.builder((ReduceFunction)new AggregatingReduce(), LongValue.class, (int)0).input(new Operator[]{join}).name("Rank Aggregation").build();
        iteration.setNextPartialSolution((Operator)rankAggregation);
        iteration.setMaximumNumberOfIterations(numIterations);
        JoinOperator termination = JoinOperator.builder((JoinFunction)new JoinOldAndNew(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getPartialSolution()}).input2(new Operator[]{rankAggregation}).name("Join Old and New").build();
        iteration.setTerminationCriterion((Operator)termination);
        FileDataSink out = new FileDataSink((FileOutputFormat)new PageWithRankOutFormat(), outputPath, (Operator)iteration, "Final Ranks");
        Plan p = new Plan((GenericDataSink)out, "Simple PageRank");
        p.setDefaultParallelism(dop);
        return p;
    }

    public String getDescription() {
        return "Parameters: <degree-of-parallelism> <pages-input-path> <edges-input-path> <output-path> <max-iterations> <num-vertices> <num-dangling-vertices>";
    }

    public static final class JoinOldAndNew
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private Record record = new Record();
        private LongValue vertexID = new LongValue();
        private DoubleValue newRank = new DoubleValue();
        private DoubleValue rank = new DoubleValue();

        public void join(Record pageWithRank, Record newPageWithRank, Collector<Record> out) throws Exception {
            this.rank = (DoubleValue)pageWithRank.getField(1, (Value)this.rank);
            this.newRank = (DoubleValue)newPageWithRank.getField(1, (Value)this.newRank);
            this.vertexID = (LongValue)pageWithRank.getField(0, (Value)this.vertexID);
            double epsilon = 0.05;
            double criterion = this.rank.getValue() - this.newRank.getValue();
            if (Math.abs(criterion) > epsilon) {
                this.record.setField(0, (Value)new IntValue(1));
                out.collect((Object)this.record);
            }
        }
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static final class AggregatingReduce
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final DoubleValue sum = new DoubleValue();

        public void reduce(Iterator<Record> pageWithPartialRank, Collector<Record> out) throws Exception {
            Record rec = null;
            double rankSum = 0.0;
            while (pageWithPartialRank.hasNext()) {
                rec = pageWithPartialRank.next();
                rankSum += ((DoubleValue)rec.getField(1, DoubleValue.class)).getValue();
            }
            this.sum.setValue(rankSum);
            rec.setField(1, (Value)this.sum);
            out.collect((Object)rec);
        }
    }

    public static final class JoinVerexWithEdgesMatch
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private Record record = new Record();
        private LongValue vertexID = new LongValue();
        private DoubleValue partialRank = new DoubleValue();
        private DoubleValue rank = new DoubleValue();
        private LongArrayView adjacentNeighbors = new LongArrayView();

        public void join(Record pageWithRank, Record edges, Collector<Record> out) throws Exception {
            this.rank = (DoubleValue)pageWithRank.getField(1, (Value)this.rank);
            this.adjacentNeighbors = (LongArrayView)edges.getField(1, (Value)this.adjacentNeighbors);
            int numNeighbors = this.adjacentNeighbors.size();
            double rankToDistribute = this.rank.getValue() / (double)numNeighbors;
            this.partialRank.setValue(rankToDistribute);
            this.record.setField(1, (Value)this.partialRank);
            for (int n = 0; n < numNeighbors; ++n) {
                this.vertexID.setValue(this.adjacentNeighbors.getQuick(n));
                this.record.setField(0, (Value)this.vertexID);
                out.collect((Object)this.record);
            }
        }
    }
}

