/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele.danglingpagerank;

import eu.stratosphere.api.java.record.functions.CoGroupFunction;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.iterative.nephele.ConfigUtils;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.BooleanValue;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.PageRankStats;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.util.Iterator;
import java.util.Set;

public class CompensatableDotProductCoGroup
extends CoGroupFunction {
    private static final long serialVersionUID = 1L;
    public static final String AGGREGATOR_NAME = "pagerank.aggregator";
    private Record accumulator = new Record();
    private int workerIndex;
    private int currentIteration;
    private int failingIteration;
    private Set<Integer> failingWorkers;
    private PageRankStatsAggregator aggregator;
    private long numVertices;
    private long numDanglingVertices;
    private double dampingFactor;
    private double danglingRankFactor;
    private static final double BETA = 0.85;
    private final DoubleValue newRank = new DoubleValue();
    private BooleanValue isDangling = new BooleanValue();
    private LongValue vertexID = new LongValue();
    private DoubleValue doubleInstance = new DoubleValue();

    public void open(Configuration parameters) {
        this.workerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        this.failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
        this.failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
        this.numVertices = ConfigUtils.asLong("pageRank.numVertices", parameters);
        this.numDanglingVertices = ConfigUtils.asLong("pageRank.numDanglingVertices", parameters);
        this.dampingFactor = 0.15000000000000002 / (double)this.numVertices;
        this.aggregator = (PageRankStatsAggregator)this.getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
        if (this.currentIteration == 1) {
            this.danglingRankFactor = 0.85 * (double)this.numDanglingVertices / ((double)this.numVertices * (double)this.numVertices);
        } else {
            PageRankStats previousAggregate = (PageRankStats)this.getIterationRuntimeContext().getPreviousIterationAggregate(AGGREGATOR_NAME);
            this.danglingRankFactor = 0.85 * previousAggregate.danglingRank() / (double)this.numVertices;
        }
    }

    public void coGroup(Iterator<Record> currentPageRankIterator, Iterator<Record> partialRanks, Collector<Record> collector) {
        if (!currentPageRankIterator.hasNext()) {
            long missingVertex = ((LongValue)partialRanks.next().getField(0, LongValue.class)).getValue();
            throw new IllegalStateException("No current page rank for vertex [" + missingVertex + "]!");
        }
        Record currentPageRank = currentPageRankIterator.next();
        long edges = 0L;
        double summedRank = 0.0;
        while (partialRanks.hasNext()) {
            summedRank += ((DoubleValue)partialRanks.next().getField(1, (Value)this.doubleInstance)).getValue();
            ++edges;
        }
        double rank = 0.85 * summedRank + this.dampingFactor + this.danglingRankFactor;
        double currentRank = ((DoubleValue)currentPageRank.getField(1, (Value)this.doubleInstance)).getValue();
        this.isDangling = (BooleanValue)currentPageRank.getField(2, (Value)this.isDangling);
        double danglingRankToAggregate = this.isDangling.get() ? rank : 0.0;
        long danglingVerticesToAggregate = this.isDangling.get() ? 1L : 0L;
        double diff = Math.abs(currentRank - rank);
        this.aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1L, edges, summedRank, 0.0);
        this.newRank.setValue(rank);
        this.accumulator.setField(0, currentPageRank.getField(0, (Value)this.vertexID));
        this.accumulator.setField(1, (Value)this.newRank);
        this.accumulator.setField(2, (Value)this.isDangling);
        collector.collect((Object)this.accumulator);
    }

    public void close() throws Exception {
        if (this.currentIteration == this.failingIteration && this.failingWorkers.contains(this.workerIndex)) {
            this.aggregator.reset();
        }
    }
}

