/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.example.java.record.connectedcomponents;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;

public class WorksetConnectedComponents
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String verticesInput = args.length > 1 ? args[1] : "";
        String edgeInput = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        int maxIterations = args.length > 4 ? Integer.parseInt(args[4]) : 1;
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, "Vertices");
        MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(new Operator[]{initialVertices}).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
        iteration.setInitialSolutionSet(new Operator[]{verticesWithId});
        iteration.setInitialWorkset(new Operator[]{verticesWithId});
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, "Edges");
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getWorkset()}).input2(new Operator[]{edges}).name("Join Candidate Id With Neighbor").build();
        ReduceOperator minCandidateId = ReduceOperator.builder((ReduceFunction)new MinimumComponentIDReduce(), LongValue.class, (int)0).input(new Operator[]{joinWithNeighbors}).name("Find Minimum Candidate Id").build();
        JoinOperator updateComponentId = JoinOperator.builder((JoinFunction)new UpdateComponentIdMatch(), LongValue.class, (int)0, (int)0).input1(new Operator[]{minCandidateId}).input2(new Operator[]{iteration.getSolutionSet()}).name("Update Component Id").build();
        iteration.setNextWorkset((Operator)updateComponentId);
        iteration.setSolutionSetDelta((Operator)updateComponentId);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)iteration, "Result");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(LongValue.class, 1);
        Plan plan = new Plan((GenericDataSink)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: <numberOfSubTasks> <vertices> <edges> <out> <maxIterations>";
    }

    @FunctionAnnotation.ConstantFieldsFirst(value={0})
    public static final class UpdateComponentIdMatch
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record> out) {
            long currentComponentID;
            long candidateComponentID = ((LongValue)newVertexWithComponent.getField(1, LongValue.class)).getValue();
            if (candidateComponentID < (currentComponentID = ((LongValue)currentVertexWithComponent.getField(1, LongValue.class)).getValue())) {
                out.collect((Object)newVertexWithComponent);
            }
        }
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static final class MinimumComponentIDReduce
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record result = new Record();
        private final LongValue vertexId = new LongValue();
        private final LongValue minComponentId = new LongValue();

        public void reduce(Iterator<Record> records, Collector<Record> out) {
            Record first = records.next();
            long vertexID = ((LongValue)first.getField(0, LongValue.class)).getValue();
            long minimumComponentID = ((LongValue)first.getField(1, LongValue.class)).getValue();
            while (records.hasNext()) {
                long candidateComponentID = ((LongValue)records.next().getField(1, LongValue.class)).getValue();
                if (candidateComponentID >= minimumComponentID) continue;
                minimumComponentID = candidateComponentID;
            }
            this.vertexId.setValue(vertexID);
            this.minComponentId.setValue(minimumComponentID);
            this.result.setField(0, (Value)this.vertexId);
            this.result.setField(1, (Value)this.minComponentId);
            out.collect((Object)this.result);
        }
    }

    public static final class NeighborWithComponentIDJoin
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record result = new Record();

        public void join(Record vertexWithComponent, Record edge, Collector<Record> out) {
            this.result.setField(0, edge.getField(1, LongValue.class));
            this.result.setField(1, vertexWithComponent.getField(1, LongValue.class));
            out.collect((Object)this.result);
        }
    }

    public static final class DuplicateLongMap
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> out) throws Exception {
            record.setField(1, record.getField(0, LongValue.class));
            out.collect((Object)record);
        }
    }
}

