/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.graph;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.DeltaIteration;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.FileDataSource;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

public class DeltaPageRankWithInitialDeltas
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String solutionSetInput = args.length > 1 ? args[1] : "";
        String deltasInput = args.length > 2 ? args[2] : "";
        String dependencySetInput = args.length > 3 ? args[3] : "";
        String output = args.length > 4 ? args[4] : "";
        int maxIterations = args.length > 5 ? Integer.parseInt(args[5]) : 1;
        FileDataSource initialSolutionSet = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, DoubleValue.class}), solutionSetInput, "Initial Solution Set");
        FileDataSource initialDeltaSet = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, DoubleValue.class}), deltasInput, "Initial DeltaSet");
        FileDataSource dependencySet = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class, LongValue.class}), dependencySetInput, "Dependency Set");
        DeltaIteration iteration = new DeltaIteration(0, "Delta PageRank");
        iteration.setInitialSolutionSet((Operator)initialSolutionSet);
        iteration.setInitialWorkset((Operator)initialDeltaSet);
        iteration.setMaximumNumberOfIterations(maxIterations);
        JoinOperator dependenciesMatch = JoinOperator.builder(PRDependenciesComputationMatchDelta.class, LongValue.class, (int)0, (int)0).input1(iteration.getWorkset()).input2((Operator)dependencySet).name("calculate dependencies").build();
        ReduceOperator updateRanks = ReduceOperator.builder(UpdateRankReduceDelta.class, LongValue.class, (int)0).input((Operator)dependenciesMatch).name("update ranks").build();
        JoinOperator oldRankComparison = JoinOperator.builder(RankComparisonMatch.class, LongValue.class, (int)0, (int)0).input1((Operator)updateRanks).input2(iteration.getSolutionSet()).name("comparison with old ranks").build();
        iteration.setNextWorkset((Operator)updateRanks);
        iteration.setSolutionSetDelta((Operator)oldRankComparison);
        FileDataSink result = new FileDataSink(CsvOutputFormat.class, output, (Operator)iteration, "Final Ranks");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(DoubleValue.class, 1);
        Plan plan = new Plan((GenericDataSinkBase)result, "Delta PageRank");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: <numberOfSubTasks> <initialSolutionSet(pageId, rank)> <deltas(pageId, delta)> <dependencySet(srcId, trgId, out_links)> <out> <maxIterations>";
    }

    public class PRDependenciesComputationMatchDelta
    extends JoinFunction {
        private static final long serialVersionUID = 1L;
        private final Record result = new Record();
        private final DoubleValue partRank = new DoubleValue();

        public void join(Record vertexWithRank, Record edgeWithWeight, Collector<Record> out) throws Exception {
            this.result.setField(0, edgeWithWeight.getField(1, LongValue.class));
            long outLinks = ((LongValue)edgeWithWeight.getField(2, LongValue.class)).getValue();
            double rank = ((DoubleValue)vertexWithRank.getField(1, DoubleValue.class)).getValue();
            this.partRank.setValue(rank / (double)outLinks);
            this.result.setField(1, (Value)this.partRank);
            out.collect((Object)this.result);
        }
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static final class UpdateRankReduceDelta
    extends ReduceFunction {
        private static final long serialVersionUID = 1L;
        private final DoubleValue newRank = new DoubleValue();

        public void reduce(Iterator<Record> records, Collector<Record> out) {
            double rankSum = 0.0;
            Record rec = null;
            while (records.hasNext()) {
                rec = records.next();
                double rank = ((DoubleValue)rec.getField(1, DoubleValue.class)).getValue();
                rankSum += rank;
            }
            if (Math.abs(rankSum) > 1.0E-5) {
                this.newRank.setValue(rankSum);
                rec.setField(1, (Value)this.newRank);
                out.collect((Object)rec);
            }
        }
    }

    @FunctionAnnotation.ConstantFieldsSecond(value={0})
    public static final class RankComparisonMatch
    extends JoinFunction {
        private static final long serialVersionUID = 1L;
        private final DoubleValue newRank = new DoubleValue();

        public void join(Record vertexWithDelta, Record vertexWithOldRank, Collector<Record> out) throws Exception {
            DoubleValue deltaVal = (DoubleValue)vertexWithDelta.getField(1, DoubleValue.class);
            DoubleValue currentVal = (DoubleValue)vertexWithOldRank.getField(1, DoubleValue.class);
            this.newRank.setValue(deltaVal.getValue() + currentVal.getValue());
            vertexWithOldRank.setField(1, (Value)this.newRank);
            out.collect((Object)vertexWithOldRank);
        }
    }
}

