/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele.customdanglingpagerank;

import eu.stratosphere.api.common.functions.AbstractFunction;
import eu.stratosphere.api.common.functions.GenericJoiner;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.iterative.nephele.ConfigUtils;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import eu.stratosphere.util.Collector;
import java.util.Random;
import java.util.Set;

public class CustomCompensatableDotProductMatch
extends AbstractFunction
implements GenericJoiner<VertexWithRankAndDangling, VertexWithAdjacencyList, VertexWithRank> {
    private static final long serialVersionUID = 1L;
    private VertexWithRank record = new VertexWithRank();
    private Random random = new Random();
    private double messageLoss;
    private boolean isFailure;

    public void open(Configuration parameters) throws Exception {
        int workerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        int currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        int failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
        Set<Integer> failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
        this.isFailure = currentIteration == failingIteration && failingWorkers.contains(workerIndex);
        this.messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
    }

    public void join(VertexWithRankAndDangling pageWithRank, VertexWithAdjacencyList adjacencyList, Collector<VertexWithRank> collector) throws Exception {
        double rank = pageWithRank.getRank();
        long[] adjacentNeighbors = adjacencyList.getTargets();
        int numNeighbors = adjacencyList.getNumTargets();
        double rankToDistribute = rank / (double)numNeighbors;
        this.record.setRank(rankToDistribute);
        for (int n = 0; n < numNeighbors; ++n) {
            this.record.setVertexID(adjacentNeighbors[n]);
            if (this.isFailure) {
                if (!(this.random.nextDouble() >= this.messageLoss)) continue;
                collector.collect((Object)this.record);
                continue;
            }
            collector.collect((Object)this.record);
        }
    }
}

