/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.spargel.java.examples;

import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.FlatMapFunction;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.operators.FlatMapOperator;
import eu.stratosphere.api.java.operators.MapOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.spargel.java.MessageIterator;
import eu.stratosphere.spargel.java.MessagingFunction;
import eu.stratosphere.spargel.java.OutgoingEdge;
import eu.stratosphere.spargel.java.VertexCentricIteration;
import eu.stratosphere.spargel.java.VertexUpdateFunction;
import eu.stratosphere.util.Collector;

public class SpargelPageRank {
    private static final double BETA = 0.85;

    public static void main(String[] args) throws Exception {
        int numVertices = 100;
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator intialRanks = env.generateSequence(1L, 100L).map((MapFunction)new MapFunction<Long, Tuple2<Long, Double>>(){

            public Tuple2<Long, Double> map(Long value) {
                return new Tuple2((Object)value, (Object)0.01);
            }
        });
        FlatMapOperator edgesWithProbability = env.generateSequence(1L, 100L).flatMap((FlatMapFunction)new FlatMapFunction<Long, Tuple3<Long, Long, Double>>(){

            public void flatMap(Long value, Collector<Tuple3<Long, Long, Double>> out) {
                int numOutEdges = (int)(Math.random() * 50.0);
                for (int i = 0; i < numOutEdges; ++i) {
                    long target = (long)(Math.random() * 100.0) + 1L;
                    out.collect((Object)new Tuple3((Object)value, (Object)target, (Object)(1.0 / (double)numOutEdges)));
                }
            }
        });
        DataSet result = intialRanks.runOperation(VertexCentricIteration.withValuedEdges(edgesWithProbability, new VertexRankUpdater(100L, 0.85), new RankMessenger(), 20));
        result.print();
        env.execute("Spargel PageRank");
    }

    public static final class RankMessenger
    extends MessagingFunction<Long, Double, Double, Double> {
        @Override
        public void sendMessages(Long vertexId, Double newRank) {
            for (OutgoingEdge edge : this.getOutgoingEdges()) {
                this.sendMessageTo(edge.target(), newRank * (Double)edge.edgeValue());
            }
        }
    }

    public static final class VertexRankUpdater
    extends VertexUpdateFunction<Long, Double, Double> {
        private final long numVertices;
        private final double beta;

        public VertexRankUpdater(long numVertices, double beta) {
            this.numVertices = numVertices;
            this.beta = beta;
        }

        @Override
        public void updateVertex(Long vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
            double rankSum = 0.0;
            for (double msg : inMessages) {
                rankSum += msg;
            }
            double newRank = this.beta * rankSum + 0.15000000000000002 / (double)this.numVertices;
            this.setNewVertexValue(newRank);
        }
    }
}

