/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele.danglingpagerank;

import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.iterative.nephele.ConfigUtils;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.LongArrayView;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.util.Random;
import java.util.Set;

public class CompensatableDotProductMatch
extends JoinFunction {
    private static final long serialVersionUID = 1L;
    private Record record;
    private LongValue vertexID;
    private DoubleValue partialRank;
    private DoubleValue rank = new DoubleValue();
    private LongArrayView adjacentNeighbors = new LongArrayView();
    private int workerIndex;
    private int currentIteration;
    private int failingIteration;
    private Set<Integer> failingWorkers;
    private double messageLoss;
    private Random random;

    public void open(Configuration parameters) {
        this.record = new Record();
        this.vertexID = new LongValue();
        this.partialRank = new DoubleValue();
        this.workerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.currentIteration = this.getIterationRuntimeContext().getSuperstepNumber();
        this.failingIteration = ConfigUtils.asInteger("compensation.failingIteration", parameters);
        this.failingWorkers = ConfigUtils.asIntSet("compensation.failingWorker", parameters);
        this.messageLoss = ConfigUtils.asDouble("compensation.messageLoss", parameters);
        this.random = new Random();
    }

    public void join(Record pageWithRank, Record adjacencyList, Collector<Record> collector) {
        this.rank = (DoubleValue)pageWithRank.getField(1, (Value)this.rank);
        this.adjacentNeighbors = (LongArrayView)adjacencyList.getField(1, (Value)this.adjacentNeighbors);
        int numNeighbors = this.adjacentNeighbors.size();
        double rankToDistribute = this.rank.getValue() / (double)numNeighbors;
        this.partialRank.setValue(rankToDistribute);
        this.record.setField(1, (Value)this.partialRank);
        boolean isFailure = this.currentIteration == this.failingIteration && this.failingWorkers.contains(this.workerIndex);
        for (int n = 0; n < numNeighbors; ++n) {
            this.vertexID.setValue(this.adjacentNeighbors.getQuick(n));
            this.record.setField(0, (Value)this.vertexID);
            if (isFailure) {
                if (!(this.random.nextDouble() >= this.messageLoss)) continue;
                collector.collect((Object)this.record);
                continue;
            }
            collector.collect((Object)this.record);
        }
    }
}

