/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.graph;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.FileDataSource;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.recordJobs.graph.ComputeEdgeDegrees;
import eu.stratosphere.test.recordJobs.graph.EnumTrianglesOnEdgesWithDegrees;
import eu.stratosphere.test.recordJobs.graph.triangleEnumUtil.EdgeInputFormat;
import eu.stratosphere.test.recordJobs.graph.triangleEnumUtil.TriangleOutputFormat;
import eu.stratosphere.types.IntValue;

public class EnumTrianglesWithDegrees
implements Program,
ProgramDescription {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String edgeInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        int delimiter = args.length > 3 ? (int)Integer.parseInt(args[3]) : 44;
        FileDataSource edges = new FileDataSource((FileInputFormat)new EdgeInputFormat(), edgeInput, "Input Edges");
        edges.setParameter("edgeinput.delimiter", delimiter);
        MapOperator projectEdge = MapOperator.builder((MapFunction)new ComputeEdgeDegrees.ProjectEdge()).input((Operator)edges).name("Project Edge").build();
        ReduceOperator edgeCounter = ReduceOperator.builder((ReduceFunction)new ComputeEdgeDegrees.CountEdges(), IntValue.class, (int)0).input((Operator)projectEdge).name("Count Edges for Vertex").build();
        ReduceOperator countJoiner = ReduceOperator.builder((ReduceFunction)new ComputeEdgeDegrees.JoinCountsAndUniquify(), IntValue.class, (int)0).keyField(IntValue.class, 1).input((Operator)edgeCounter).name("Join Counts").build();
        MapOperator toLowerDegreeEdge = MapOperator.builder((MapFunction)new EnumTrianglesOnEdgesWithDegrees.ProjectToLowerDegreeVertex()).input((Operator)countJoiner).name("Select lower-degree Edge").build();
        MapOperator projectOutCounts = MapOperator.builder((MapFunction)new EnumTrianglesOnEdgesWithDegrees.ProjectOutCounts()).input((Operator)countJoiner).name("Project out Counts").build();
        ReduceOperator buildTriads = ReduceOperator.builder((ReduceFunction)new EnumTrianglesOnEdgesWithDegrees.BuildTriads(), IntValue.class, (int)0).input((Operator)toLowerDegreeEdge).name("Build Triads").build();
        JoinOperator closeTriads = JoinOperator.builder((JoinFunction)new EnumTrianglesOnEdgesWithDegrees.CloseTriads(), IntValue.class, (int)1, (int)0).keyField(IntValue.class, 2, 1).input1((Operator)buildTriads).input2((Operator)projectOutCounts).name("Close Triads").build();
        closeTriads.setParameter("INPUT_SHIP_STRATEGY", "SHIP_REPARTITION_HASH");
        closeTriads.setParameter("LOCAL_STRATEGY", "LOCAL_STRATEGY_HASH_BUILD_SECOND");
        FileDataSink triangles = new FileDataSink((FileOutputFormat)new TriangleOutputFormat(), output, (Operator)closeTriads, "Triangles");
        Plan p = new Plan((GenericDataSinkBase)triangles, "Enumerate Triangles");
        p.setDefaultParallelism(numSubTasks);
        return p;
    }

    public String getDescription() {
        return "Parameters: [noSubStasks] [input file] [output file]";
    }
}

