/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.compiler.iterations;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.JoinFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.operators.JoinOperator;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.compiler.dag.TempMode;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
import eu.stratosphere.compiler.plan.SourcePlanNode;
import eu.stratosphere.compiler.plan.WorksetIterationPlanNode;
import eu.stratosphere.compiler.plantranslate.NepheleJobGraphGenerator;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.test.compiler.CompilerTestBase;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import org.junit.Assert;
import org.junit.Test;

public class ConnectedComponentsTest
extends CompilerTestBase {
    private static final String VERTEX_SOURCE = "Vertices";
    private static final String ITERATION_NAME = "Connected Components Iteration";
    private static final String EDGES_SOURCE = "Edges";
    private static final String JOIN_NEIGHBORS_MATCH = "Join Candidate Id With Neighbor";
    private static final String MIN_ID_REDUCER = "Find Minimum Candidate Id";
    private static final String UPDATE_ID_MATCH = "Update Component Id";
    private static final String SINK = "Result";
    private static final boolean PRINT_PLAN = false;
    private final FieldList set0 = new FieldList(0);

    @Test
    public void testWorksetConnectedComponents() {
        WorksetConnectedComponents cc = new WorksetConnectedComponents();
        Plan plan = cc.getPlan(String.valueOf(8), IN_FILE, IN_FILE, OUT_FILE, String.valueOf(100));
        OptimizedPlan optPlan = this.compileNoStats(plan);
        CompilerTestBase.OptimizerPlanNodeResolver or = ConnectedComponentsTest.getOptimizerPlanNodeResolver(optPlan);
        SourcePlanNode vertexSource = (SourcePlanNode)or.getNode(VERTEX_SOURCE);
        SourcePlanNode edgesSource = (SourcePlanNode)or.getNode(EDGES_SOURCE);
        SinkPlanNode sink = (SinkPlanNode)or.getNode(SINK);
        WorksetIterationPlanNode iter = (WorksetIterationPlanNode)or.getNode(ITERATION_NAME);
        DualInputPlanNode neighborsJoin = (DualInputPlanNode)or.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode minIdReducer = (SingleInputPlanNode)or.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode minIdCombiner = (SingleInputPlanNode)minIdReducer.getPredecessor();
        DualInputPlanNode updatingMatch = (DualInputPlanNode)or.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)sink.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)vertexSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)edgesSource.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput2());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_SECOND, (Object)updatingMatch.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)neighborsJoin.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)neighborsJoin.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getInput2().getShipStrategyKeys());
        Assert.assertTrue((boolean)neighborsJoin.getInput2().getTempMode().isCached());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)minIdReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)minIdCombiner.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput2().getShipStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)sink.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)minIdReducer.getInput().getLocalStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getLocalStrategyKeys());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)minIdCombiner.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput2().getLocalStrategy());
        Assert.assertTrue((TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy() ? 1 : 0) != 0);
        NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
        jgg.compileJobGraph(optPlan);
    }

    @Test
    public void testWorksetConnectedComponentsWithSolutionSetAsFirstInput() {
        Plan plan = ConnectedComponentsTest.getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(8, IN_FILE, IN_FILE, OUT_FILE, 100);
        OptimizedPlan optPlan = this.compileNoStats(plan);
        CompilerTestBase.OptimizerPlanNodeResolver or = ConnectedComponentsTest.getOptimizerPlanNodeResolver(optPlan);
        SourcePlanNode vertexSource = (SourcePlanNode)or.getNode(VERTEX_SOURCE);
        SourcePlanNode edgesSource = (SourcePlanNode)or.getNode(EDGES_SOURCE);
        SinkPlanNode sink = (SinkPlanNode)or.getNode(SINK);
        WorksetIterationPlanNode iter = (WorksetIterationPlanNode)or.getNode(ITERATION_NAME);
        DualInputPlanNode neighborsJoin = (DualInputPlanNode)or.getNode(JOIN_NEIGHBORS_MATCH);
        SingleInputPlanNode minIdReducer = (SingleInputPlanNode)or.getNode(MIN_ID_REDUCER);
        SingleInputPlanNode minIdCombiner = (SingleInputPlanNode)minIdReducer.getPredecessor();
        DualInputPlanNode updatingMatch = (DualInputPlanNode)or.getNode(UPDATE_ID_MATCH);
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)sink.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)vertexSource.getDriverStrategy());
        Assert.assertEquals((Object)DriverStrategy.NONE, (Object)edgesSource.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getKeysForInput2());
        Assert.assertEquals((Object)DriverStrategy.HYBRIDHASH_BUILD_FIRST, (Object)updatingMatch.getDriverStrategy());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput1());
        Assert.assertEquals((Object)this.set0, (Object)updatingMatch.getKeysForInput2());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)sink.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialSolutionSetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialSolutionSetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)iter.getInitialWorksetInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)iter.getInitialWorksetInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)neighborsJoin.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)neighborsJoin.getInput2().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)neighborsJoin.getInput2().getShipStrategyKeys());
        Assert.assertTrue((boolean)neighborsJoin.getInput2().getTempMode().isCached());
        Assert.assertEquals((Object)ShipStrategyType.PARTITION_HASH, (Object)minIdReducer.getInput().getShipStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getShipStrategyKeys());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)minIdCombiner.getInput().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput1().getShipStrategy());
        Assert.assertEquals((Object)ShipStrategyType.FORWARD, (Object)updatingMatch.getInput2().getShipStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)sink.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)iter.getInitialSolutionSetInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)neighborsJoin.getInput2().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.COMBININGSORT, (Object)minIdReducer.getInput().getLocalStrategy());
        Assert.assertEquals((Object)this.set0, (Object)minIdReducer.getInput().getLocalStrategyKeys());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)minIdCombiner.getInput().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput1().getLocalStrategy());
        Assert.assertEquals((Object)LocalStrategy.NONE, (Object)updatingMatch.getInput2().getLocalStrategy());
        Assert.assertTrue((TempMode.PIPELINE_BREAKER == iter.getInitialWorksetInput().getTempMode() || LocalStrategy.SORT == iter.getInitialWorksetInput().getLocalStrategy() ? 1 : 0) != 0);
        NepheleJobGraphGenerator jgg = new NepheleJobGraphGenerator();
        jgg.compileJobGraph(optPlan);
    }

    private static Plan getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations) {
        FileDataSource initialVertices = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class}), verticesInput, VERTEX_SOURCE);
        MapOperator verticesWithId = MapOperator.builder(WorksetConnectedComponents.DuplicateLongMap.class).input(new Operator[]{initialVertices}).name("Assign Vertex Ids").build();
        DeltaIteration iteration = new DeltaIteration(0, ITERATION_NAME);
        iteration.setInitialSolutionSet((Operator)verticesWithId);
        iteration.setInitialWorkset((Operator)verticesWithId);
        iteration.setMaximumNumberOfIterations(maxIterations);
        FileDataSource edges = new FileDataSource((FileInputFormat)new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class}), edgeInput, EDGES_SOURCE);
        JoinOperator joinWithNeighbors = JoinOperator.builder((JoinFunction)new WorksetConnectedComponents.NeighborWithComponentIDJoin(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getWorkset()}).input2(new Operator[]{edges}).name(JOIN_NEIGHBORS_MATCH).build();
        ReduceOperator minCandidateId = ReduceOperator.builder((ReduceFunction)new WorksetConnectedComponents.MinimumComponentIDReduce(), LongValue.class, (int)0).input(new Operator[]{joinWithNeighbors}).name(MIN_ID_REDUCER).build();
        JoinOperator updateComponentId = JoinOperator.builder((JoinFunction)new UpdateComponentIdMatchMirrored(), LongValue.class, (int)0, (int)0).input1(new Operator[]{iteration.getSolutionSet()}).input2(new Operator[]{minCandidateId}).name(UPDATE_ID_MATCH).build();
        iteration.setNextWorkset((Operator)updateComponentId);
        iteration.setSolutionSetDelta((Operator)updateComponentId);
        FileDataSink result = new FileDataSink((FileOutputFormat)new CsvOutputFormat(), output, (Operator)iteration, SINK);
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)result).recordDelimiter('\n')).fieldDelimiter(' ')).field(LongValue.class, 0)).field(LongValue.class, 1);
        Plan plan = new Plan((GenericDataSink)result, "Workset Connected Components");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    @FunctionAnnotation.ConstantFieldsSecond(value={0})
    public static final class UpdateComponentIdMatchMirrored
    extends JoinFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void join(Record currentVertexWithComponent, Record newVertexWithComponent, Collector<Record> out) {
            long currentComponentID;
            long candidateComponentID = ((LongValue)newVertexWithComponent.getField(1, LongValue.class)).getValue();
            if (candidateComponentID < (currentComponentID = ((LongValue)currentVertexWithComponent.getField(1, LongValue.class)).getValue())) {
                out.collect((Object)newVertexWithComponent);
            }
        }
    }
}

