/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.GenericDataSinkBase;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.DeltaIteration;
import eu.stratosphere.api.java.record.operators.FileDataSink;
import eu.stratosphere.api.java.record.operators.FileDataSource;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.BufferedReader;
import java.io.Serializable;

public class ConnectedComponentsWithSolutionSetFirstITCase
extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected Plan getTestJob() {
        return ConnectedComponentsWithSolutionSetFirstITCase.getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(4, this.verticesPath, this.edgesPath, this.resultPath, 100);
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : this.getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    private static Plan getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations) {
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, "Vertices");
        MapOperator verticesWithId = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input((Operator)initialVertices).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
        iteration.setInitialSolutionSet((Operator)verticesWithId);
        iteration.setInitialWorkset((Operator)verticesWithId);
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, "Edges");
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(iteration.getWorkset()).input2((Operator)edges).name("Join Candidate Id With Neighbor").build();
        ReduceOperator minCandidateId = ReduceOperator.builder((ReduceFunction)new WorksetConnectedComponents.MinimumComponentIDReduce(), LongValue.class, (int)0).input((Operator)joinWithNeighbors).name("Find Minimum Candidate Id").build();
        JoinOperator updateComponentId = JoinOperator.builder((JoinFunction)new UpdateComponentIdMatchMirrored(), LongValue.class, (int)0, (int)0).input1(iteration.getSolutionSet()).input2((Operator)minCandidateId).name("Update Component Id").build();
        iteration.setNextWorkset((Operator)updateComponentId);
        iteration.setSolutionSetDelta((Operator)updateComponentId);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)iteration, "Result");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(LongValue.class, 1);
        Plan plan = new Plan((GenericDataSinkBase)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    @FunctionAnnotation.ConstantFieldsSecondExcept(value={})
    public static final class UpdateComponentIdMatchMirrored
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record currentVertexWithComponent, Record newVertexWithComponent, Collector<Record> out) {
            long currentComponentID;
            long candidateComponentID = ((LongValue)newVertexWithComponent.getField(1, LongValue.class)).getValue();
            if (candidateComponentID < (currentComponentID = ((LongValue)currentVertexWithComponent.getField(1, LongValue.class)).getValue())) {
                out.collect((Object)newVertexWithComponent);
            }
        }
    }
}

