/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.example.java.graph;

import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.aggregation.Aggregations;
import eu.stratosphere.api.java.functions.FilterFunction;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.operators.MapOperator;
import eu.stratosphere.api.java.operators.ReduceGroupOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.example.java.graph.util.PageRankData;
import eu.stratosphere.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;

public class PageRankBasic {
    private static final double DAMPENING_FACTOR = 0.85;
    private static final double EPSILON = 1.0E-4;
    private static boolean fileOutput = false;
    private static String pageWithRankInputPath = null;
    private static String edgeInputPath = null;
    private static String outputPath = null;
    private static int numVertices = 0;
    private static int maxIterations = 10;

    public static void main(String[] args) throws Exception {
        PageRankBasic.parseParameters(args);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple2<Long, Double>> pageWithRankInput = PageRankBasic.getPageWithRankDataSet(env);
        DataSet<Tuple2<Long, Long>> edgeInput = PageRankBasic.getEdgeDataSet(env);
        ReduceGroupOperator adjacencyListInput = edgeInput.groupBy(new int[]{0}).reduceGroup((GroupReduceFunction)new BuildOutgoingEdgeList());
        IterativeDataSet iteration = pageWithRankInput.iterate(maxIterations);
        MapOperator newRanks = iteration.join((DataSet)adjacencyListInput).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new JoinVertexWithEdgesMatch()).groupBy(new int[]{0}).aggregate(Aggregations.SUM, 1).map((MapFunction)new Dampener(numVertices));
        DataSet finalPageRanks = iteration.closeWith((DataSet)newRanks, (DataSet)newRanks.join((DataSet)iteration).where(new int[]{0}).equalTo(new int[]{0}).filter((FilterFunction)new EpsilonFilter()));
        if (fileOutput) {
            finalPageRanks.writeAsCsv(outputPath, "\n", " ");
        } else {
            finalPageRanks.print();
        }
        env.execute("Basic Page Rank Example");
    }

    private static void parseParameters(String[] args) {
        if (args.length > 0) {
            if (args.length == 5) {
                fileOutput = true;
                pageWithRankInputPath = args[0];
                edgeInputPath = args[1];
                outputPath = args[2];
                numVertices = Integer.parseInt(args[3]);
                maxIterations = Integer.parseInt(args[4]);
            } else {
                System.err.println("Usage: PageRankBasic <vertex with initial rank input> <edges path> <output path> <num vertices> <num iterations>");
                System.exit(1);
            }
        } else {
            System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
            System.out.println("  Provide parameters to read input data from files.");
            System.out.println("  See the documentation for the correct format of input files.");
            System.out.println("  Usage: PageRankBasic <vertex with initial rank input> <edges path> <output path> <num vertices> <num iterations>");
            numVertices = PageRankData.getNumberOfPages();
        }
    }

    private static DataSet<Tuple2<Long, Double>> getPageWithRankDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(pageWithRankInputPath).fieldDelimiter(' ').lineDelimiter("\n").types(Long.class, Double.class);
        }
        return PageRankData.getDefaultPageWithRankDataSet(env);
    }

    private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) {
        if (fileOutput) {
            return env.readCsvFile(edgeInputPath).fieldDelimiter(' ').lineDelimiter("\n").types(Long.class, Long.class);
        }
        return PageRankData.getDefaultEdgeDataSet(env);
    }

    public static final class EpsilonFilter
    extends FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
        public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
            return Math.abs((Double)((Tuple2)value.f0).f1 - (Double)((Tuple2)value.f1).f1) > 1.0E-4;
        }
    }

    public static final class Dampener
    extends MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {
        private final double numVertices;

        public Dampener(double numVertices) {
            this.numVertices = numVertices;
        }

        public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
            value.f1 = 0.85 * (Double)value.f1 + 0.15000000000000002 / this.numVertices;
            return value;
        }
    }

    public static final class JoinVertexWithEdgesMatch
    extends FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {
        public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out) {
            Long[] neigbors = (Long[])((Tuple2)value.f1).f1;
            double rank = (Double)((Tuple2)value.f0).f1;
            double rankToDistribute = rank / (double)neigbors.length;
            for (int i = 0; i < neigbors.length; ++i) {
                out.collect((Object)new Tuple2((Object)neigbors[i], (Object)rankToDistribute));
            }
        }
    }

    public static final class BuildOutgoingEdgeList
    extends GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
        private final ArrayList<Long> neighbors = new ArrayList();

        public void reduce(Iterator<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
            this.neighbors.clear();
            Long id = 0L;
            while (values.hasNext()) {
                Tuple2<Long, Long> n = values.next();
                id = (Long)n.f0;
                this.neighbors.add((Long)n.f1);
            }
            out.collect((Object)new Tuple2((Object)id, (Object)this.neighbors.toArray(new Long[this.neighbors.size()])));
        }
    }
}

