/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.BufferedReader;
import java.io.Serializable;
import java.util.Collection;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ConnectedComponentsWithDeferredUpdateITCase
extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
        super(config);
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected Plan getTestJob() {
        boolean extraMapper = this.config.getBoolean("ExtraMapper", false);
        return ConnectedComponentsWithDeferredUpdateITCase.getPlan(4, this.verticesPath, this.edgesPath, this.resultPath, 100, extraMapper);
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : this.getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config1 = new Configuration();
        config1.setBoolean("ExtraMapper", false);
        Configuration config2 = new Configuration();
        config2.setBoolean("ExtraMapper", true);
        return ConnectedComponentsWithDeferredUpdateITCase.toParameterList((Configuration[])new Configuration[]{config1, config2});
    }

    public static Plan getPlan(int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations, boolean extraMap) {
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, "Vertices");
        MapOperator verticesWithId = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input(new Operator[]{initialVertices}).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
        iteration.setInitialSolutionSet((Operator)verticesWithId);
        iteration.setInitialWorkset((Operator)verticesWithId);
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, "Edges");
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getWorkset()}).input2(new Operator[]{edges}).name("Join Candidate Id With Neighbor").build();
        ReduceOperator minCandidateId = ReduceOperator.builder((ReduceFunction)new WorksetConnectedComponents.MinimumComponentIDReduce(), LongValue.class, (int)0).input(new Operator[]{joinWithNeighbors}).name("Find Minimum Candidate Id").build();
        JoinOperator updateComponentId = JoinOperator.builder((JoinFunction)new UpdateComponentIdMatchNonPreserving(), LongValue.class, (int)0, (int)0).input1(new Operator[]{minCandidateId}).input2(new Operator[]{iteration.getSolutionSet()}).name("Update Component Id").build();
        if (extraMap) {
            MapOperator mapper = MapOperator.builder(IdentityMap.class).input(new Operator[]{updateComponentId}).name("idmap").build();
            iteration.setSolutionSetDelta((Operator)mapper);
        } else {
            iteration.setSolutionSetDelta((Operator)updateComponentId);
        }
        iteration.setNextWorkset((Operator)updateComponentId);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat("\n", " ", new Class[]{LongValue.class, LongValue.class}), output, (Operator)iteration, "Result");
        Plan plan = new Plan((GenericDataSink)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public static final class IdentityMap
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> out) throws Exception {
            out.collect((Object)record);
        }
    }

    public static final class UpdateComponentIdMatchNonPreserving
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record> out) {
            long currentComponentID;
            long candidateComponentID = ((LongValue)newVertexWithComponent.getField(1, LongValue.class)).getValue();
            if (candidateComponentID < (currentComponentID = ((LongValue)currentVertexWithComponent.getField(1, LongValue.class)).getValue())) {
                out.collect((Object)newVertexWithComponent);
            }
        }
    }
}

