/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.recordJobs.graph;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.CoGroupFunction;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.CoGroupOperator;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;

public class ConnectedComponentsWithCoGroup
implements Program {
    private static final long serialVersionUID = 1L;

    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String verticesInput = args.length > 1 ? args[1] : "";
        String edgeInput = args.length > 2 ? args[2] : "";
        String output = args.length > 3 ? args[3] : "";
        int maxIterations = args.length > 4 ? Integer.parseInt(args[4]) : 1;
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, "Vertices");
        MapOperator verticesWithId = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input(new Operator[]{initialVertices}).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
        iteration.setInitialSolutionSet((Operator)verticesWithId);
        iteration.setInitialWorkset((Operator)verticesWithId);
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, "Edges");
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getWorkset()}).input2(new Operator[]{edges}).name("Join Candidate Id With Neighbor").build();
        CoGroupOperator minAndUpdate = CoGroupOperator.builder((CoGroupFunction)new MinIdAndUpdate(), LongValue.class, (int)0, (int)0).input1(new Operator[]{joinWithNeighbors}).input2(new Operator[]{iteration.getSolutionSet()}).name("Min Id and Update").build();
        iteration.setNextWorkset((Operator)minAndUpdate);
        iteration.setSolutionSetDelta((Operator)minAndUpdate);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)iteration, "Result");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(LongValue.class, 1);
        Plan plan = new Plan((GenericDataSink)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    @FunctionAnnotation.ConstantFieldsFirst(value={0})
    @FunctionAnnotation.ConstantFieldsSecond(value={0})
    public static final class MinIdAndUpdate
    extends CoGroupFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final LongValue newComponentId = new LongValue();

        public void coGroup(Iterator<Record> candidates, Iterator<Record> current, Collector<Record> out) throws Exception {
            if (!current.hasNext()) {
                throw new Exception("Error: Id not encountered before.");
            }
            Record old = current.next();
            long oldId = ((LongValue)old.getField(1, LongValue.class)).getValue();
            long minimumComponentID = Long.MAX_VALUE;
            while (candidates.hasNext()) {
                long candidateComponentID = ((LongValue)candidates.next().getField(1, LongValue.class)).getValue();
                if (candidateComponentID >= minimumComponentID) continue;
                minimumComponentID = candidateComponentID;
            }
            if (minimumComponentID < oldId) {
                this.newComponentId.setValue(minimumComponentID);
                old.setField(1, (Value)this.newComponentId);
                out.collect((Object)old);
            }
        }
    }
}

