/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele;

import eu.stratosphere.api.common.aggregators.LongSumAggregator;
import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.io.CsvInputFormat;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.io.DistributionPattern;
import eu.stratosphere.nephele.io.channels.ChannelType;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordPairComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordSerializerFactory;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver;
import eu.stratosphere.pact.runtime.task.CollectorMapDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
import eu.stratosphere.pact.runtime.task.JoinWithSolutionSetSecondDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
import eu.stratosphere.test.recordJobs.graph.WorksetConnectedComponents;
import eu.stratosphere.test.testdata.ConnectedComponentsData;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.LongValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.util.Collector;
import java.io.BufferedReader;
import java.util.Collection;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ConnectedComponentsNepheleITCase
extends RecordAPITestBase {
    private static final long SEED = 3287269182979823L;
    private static final int NUM_VERTICES = 1000;
    private static final int NUM_EDGES = 10000;
    private static final int ITERATION_ID = 1;
    private static final long MEM_PER_CONSUMER = 3L;
    protected String verticesPath;
    protected String edgesPath;
    protected String resultPath;

    public ConnectedComponentsNepheleITCase(Configuration config) {
        super(config);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config1 = new Configuration();
        config1.setInteger("testcase", 1);
        Configuration config2 = new Configuration();
        config2.setInteger("testcase", 2);
        Configuration config3 = new Configuration();
        config3.setInteger("testcase", 3);
        Configuration config4 = new Configuration();
        config4.setInteger("testcase", 4);
        return ConnectedComponentsNepheleITCase.toParameterList((Configuration[])new Configuration[]{config1, config2, config3, config4});
    }

    protected void preSubmit() throws Exception {
        this.verticesPath = this.createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices((int)1000));
        this.edgesPath = this.createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges((int)10000, (int)1000, (long)3287269182979823L));
        this.resultPath = this.getTempFilePath("results");
    }

    protected JobGraph getJobGraph() throws Exception {
        int dop = 4;
        int maxIterations = 100;
        int type = this.config.getInteger("testcase", 0);
        switch (type) {
            case 1: {
                return this.createJobGraphUnifiedTails(this.verticesPath, this.edgesPath, this.resultPath, dop, maxIterations);
            }
            case 2: {
                return this.createJobGraphSeparateTails(this.verticesPath, this.edgesPath, this.resultPath, dop, maxIterations);
            }
            case 3: {
                return this.createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(this.verticesPath, this.edgesPath, this.resultPath, dop, maxIterations);
            }
            case 4: {
                return this.createJobGraphSolutionSetUpdateAndWorksetTail(this.verticesPath, this.edgesPath, this.resultPath, dop, maxIterations);
            }
        }
        throw new RuntimeException("Broken test configuration");
    }

    protected void postSubmit() throws Exception {
        for (BufferedReader reader : this.getResultReader(this.resultPath)) {
            ConnectedComponentsData.checkOddEvenResult((BufferedReader)reader);
        }
    }

    private static JobInputVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks, TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator) {
        CsvInputFormat verticesInFormat = new CsvInputFormat(' ', new Class[]{LongValue.class});
        JobInputVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput", jobGraph, numSubTasks, numSubTasks);
        TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
        verticesInputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        verticesInputConfig.setOutputSerializer(serializer);
        TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
        chainedMapperConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(IdDuplicator.class));
        chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        chainedMapperConfig.setInputSerializer(serializer, 0);
        chainedMapperConfig.setOutputSerializer(serializer);
        chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        chainedMapperConfig.setOutputComparator(comparator, 0);
        chainedMapperConfig.setOutputComparator(comparator, 1);
        verticesInputConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "ID Duplicator");
        return verticesInput;
    }

    private static JobInputVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks, TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator) {
        CsvInputFormat edgesInFormat = new CsvInputFormat(' ', new Class[]{LongValue.class, LongValue.class});
        JobInputVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph, numSubTasks, numSubTasks);
        TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
        edgesInputConfig.setOutputSerializer(serializer);
        edgesInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        edgesInputConfig.setOutputComparator(comparator, 0);
        return edgesInput;
    }

    private static JobTaskVertex createIterationHead(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator, TypePairComparatorFactory<?, ?> pairComparator) {
        JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)", jobGraph, numSubTasks, numSubTasks);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setIterationId(1);
        headConfig.addInputToGroup(0);
        headConfig.setInputSerializer(serializer, 0);
        headConfig.setInputComparator(comparator, 0);
        headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        headConfig.addInputToGroup(1);
        headConfig.setInputSerializer(serializer, 1);
        headConfig.setInputComparator(comparator, 1);
        headConfig.setInputLocalStrategy(1, LocalStrategy.NONE);
        headConfig.setInputCached(1, true);
        headConfig.setInputMaterializationMemory(1, 0x300000L);
        headConfig.addInputToGroup(2);
        headConfig.setInputSerializer(serializer, 2);
        headConfig.setInputComparator(comparator, 2);
        headConfig.setInputLocalStrategy(2, LocalStrategy.NONE);
        headConfig.setIterationHeadSolutionSetInputIndex(2);
        headConfig.setSolutionSetSerializer(serializer);
        headConfig.setSolutionSetComparator(comparator);
        headConfig.setIsWorksetIteration();
        headConfig.setBackChannelMemory(0x300000L);
        headConfig.setSolutionSetMemory(0x300000L);
        headConfig.setOutputSerializer(serializer);
        headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        headConfig.setOutputComparator(comparator, 0);
        TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
        headFinalOutConfig.setOutputSerializer(serializer);
        headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(2);
        headConfig.setDriver(BuildSecondCachedMatchDriver.class);
        headConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        headConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.NeighborWithComponentIDJoin.class));
        headConfig.setDriverComparator(comparator, 0);
        headConfig.setDriverComparator(comparator, 1);
        headConfig.setDriverPairComparator(pairComparator);
        headConfig.setMemoryDriver(0x300000L);
        headConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", LongSumAggregator.class);
        return head;
    }

    private static JobTaskVertex createIterationIntermediate(JobGraph jobGraph, int numSubTasks, TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator) {
        JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Find Min Component-ID", jobGraph, numSubTasks, numSubTasks);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        intermediateConfig.setIterationId(1);
        intermediateConfig.addInputToGroup(0);
        intermediateConfig.setInputSerializer(serializer, 0);
        intermediateConfig.setInputComparator(comparator, 0);
        intermediateConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        intermediateConfig.setMemoryInput(0, 0x300000L);
        intermediateConfig.setFilehandlesInput(0, 64);
        intermediateConfig.setSpillingThresholdInput(0, 0.85f);
        intermediateConfig.setOutputSerializer(serializer);
        intermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        intermediateConfig.setDriver(GroupReduceDriver.class);
        intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        intermediateConfig.setDriverComparator(comparator, 0);
        intermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.MinimumComponentIDReduce.class));
        return intermediate;
    }

    private static JobOutputVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks, TypeSerializerFactory<?> serializer) {
        JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks, numSubTasks);
        TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
        outputConfig.addInputToGroup(0);
        outputConfig.setInputSerializer(serializer, 0);
        outputConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CsvOutputFormat.class));
        outputConfig.setStubParameter("stratosphere.output.file", resultPath);
        Configuration outputUserConfig = outputConfig.getStubParameters();
        outputUserConfig.setString("output.record.delimiter", "\n");
        outputUserConfig.setString("output.record.field-delimiter", " ");
        outputUserConfig.setClass("output.record.type_0", LongValue.class);
        outputUserConfig.setInteger("output.record.position_0", 0);
        outputUserConfig.setClass("output.record.type_1", LongValue.class);
        outputUserConfig.setInteger("output.record.position_1", 1);
        outputUserConfig.setInteger("output.record.num-fields", 2);
        return output;
    }

    private static JobOutputVertex createFakeTail(JobGraph jobGraph, int numSubTasks) {
        JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", numSubTasks, numSubTasks);
        return fakeTailOutput;
    }

    private static JobOutputVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
        JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setNumberOfIterations(maxIterations);
        syncConfig.setIterationId(1);
        syncConfig.addIterationAggregator("pact.runtime.workset-empty-aggregator", LongSumAggregator.class);
        syncConfig.setConvergenceCriterion("pact.runtime.workset-empty-aggregator", WorksetEmptyConvergenceCriterion.class);
        return sync;
    }

    public JobGraph createJobGraphUnifiedTails(String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations) throws JobGraphDefinitionException {
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{true});
        RecordPairComparatorFactory pairComparator = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        JobInputVertex vertices = ConnectedComponentsNepheleITCase.createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
        JobInputVertex edges = ConnectedComponentsNepheleITCase.createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
        JobTaskVertex head = ConnectedComponentsNepheleITCase.createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
        JobTaskVertex intermediate = ConnectedComponentsNepheleITCase.createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        JobOutputVertex output = ConnectedComponentsNepheleITCase.createOutput(jobGraph, resultPath, numSubTasks, serializer);
        JobOutputVertex fakeTail = ConnectedComponentsNepheleITCase.createFakeTail(jobGraph, numSubTasks);
        JobOutputVertex sync = ConnectedComponentsNepheleITCase.createSync(jobGraph, numSubTasks, maxIterations);
        JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
        tailConfig.setIterationId(1);
        tailConfig.setIsWorksetIteration();
        tailConfig.setIsWorksetUpdate();
        tailConfig.setIsSolutionSetUpdate();
        tailConfig.setIsSolutionSetUpdateWithoutReprobe();
        tailConfig.addInputToGroup(0);
        tailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        tailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        tailConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
        tailConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        tailConfig.setDriverComparator((TypeComparatorFactory)comparator, 0);
        tailConfig.setDriverPairComparator((TypePairComparatorFactory)pairComparator);
        tailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)edges, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)intermediate, (AbstractJobVertex)tail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)tail, (AbstractJobVertex)fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        vertices.setVertexToShareInstancesWith((AbstractJobVertex)head);
        edges.setVertexToShareInstancesWith((AbstractJobVertex)head);
        intermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        tail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        fakeTail.setVertexToShareInstancesWith((AbstractJobVertex)tail);
        return jobGraph;
    }

    public JobGraph createJobGraphSeparateTails(String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations) throws JobGraphDefinitionException {
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{true});
        RecordPairComparatorFactory pairComparator = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
        JobInputVertex vertices = ConnectedComponentsNepheleITCase.createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
        JobInputVertex edges = ConnectedComponentsNepheleITCase.createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
        JobTaskVertex head = ConnectedComponentsNepheleITCase.createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setWaitForSolutionSetUpdate();
        JobTaskVertex intermediate = ConnectedComponentsNepheleITCase.createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        JobOutputVertex output = ConnectedComponentsNepheleITCase.createOutput(jobGraph, resultPath, numSubTasks, serializer);
        JobOutputVertex ssFakeTail = ConnectedComponentsNepheleITCase.createFakeTail(jobGraph, numSubTasks);
        JobOutputVertex wsFakeTail = ConnectedComponentsNepheleITCase.createFakeTail(jobGraph, numSubTasks);
        JobOutputVertex sync = ConnectedComponentsNepheleITCase.createSync(jobGraph, numSubTasks, maxIterations);
        JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Join", jobGraph, numSubTasks, numSubTasks);
        TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
        ssJoinIntermediateConfig.setIterationId(1);
        ssJoinIntermediateConfig.addInputToGroup(0);
        ssJoinIntermediateConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        ssJoinIntermediateConfig.setOutputComparator((TypeComparatorFactory)comparator, 0);
        ssJoinIntermediateConfig.setOutputComparator((TypeComparatorFactory)comparator, 1);
        ssJoinIntermediateConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
        ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        ssJoinIntermediateConfig.setDriverComparator((TypeComparatorFactory)comparator, 0);
        ssJoinIntermediateConfig.setDriverPairComparator((TypePairComparatorFactory)pairComparator);
        ssJoinIntermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
        ssTailConfig.setIterationId(1);
        ssTailConfig.setIsSolutionSetUpdate();
        ssTailConfig.setIsWorksetIteration();
        ssTailConfig.addInputToGroup(0);
        ssTailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        ssTailConfig.setInputAsynchronouslyMaterialized(0, true);
        ssTailConfig.setInputMaterializationMemory(0, 0x300000L);
        ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        ssTailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        ssTailConfig.setDriver(CollectorMapDriver.class);
        ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        ssTailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyMapper.class));
        JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
        wsTailConfig.setIterationId(1);
        wsTailConfig.setIsWorksetIteration();
        wsTailConfig.setIsWorksetUpdate();
        wsTailConfig.addInputToGroup(0);
        wsTailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        wsTailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        wsTailConfig.setDriver(CollectorMapDriver.class);
        wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        wsTailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)edges, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)intermediate, (AbstractJobVertex)ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)ssJoinIntermediate, (AbstractJobVertex)ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)ssJoinIntermediate, (AbstractJobVertex)wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)ssTail, (AbstractJobVertex)ssFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)wsTail, (AbstractJobVertex)wsFakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        vertices.setVertexToShareInstancesWith((AbstractJobVertex)head);
        edges.setVertexToShareInstancesWith((AbstractJobVertex)head);
        intermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        ssJoinIntermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        wsTail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        ssTail.setVertexToShareInstancesWith((AbstractJobVertex)wsTail);
        ssFakeTail.setVertexToShareInstancesWith((AbstractJobVertex)ssTail);
        wsFakeTail.setVertexToShareInstancesWith((AbstractJobVertex)wsTail);
        return jobGraph;
    }

    public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations) throws JobGraphDefinitionException {
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{true});
        RecordPairComparatorFactory pairComparator = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
        JobInputVertex vertices = ConnectedComponentsNepheleITCase.createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
        JobInputVertex edges = ConnectedComponentsNepheleITCase.createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
        JobTaskVertex head = ConnectedComponentsNepheleITCase.createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setWaitForSolutionSetUpdate();
        JobTaskVertex intermediate = ConnectedComponentsNepheleITCase.createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        JobOutputVertex output = ConnectedComponentsNepheleITCase.createOutput(jobGraph, resultPath, numSubTasks, serializer);
        JobOutputVertex fakeTail = ConnectedComponentsNepheleITCase.createFakeTail(jobGraph, numSubTasks);
        JobOutputVertex sync = ConnectedComponentsNepheleITCase.createSync(jobGraph, numSubTasks, maxIterations);
        JobTaskVertex wsUpdateIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph, numSubTasks, numSubTasks);
        TaskConfig wsUpdateConfig = new TaskConfig(wsUpdateIntermediate.getConfiguration());
        wsUpdateConfig.setIterationId(1);
        wsUpdateConfig.setIsWorksetIteration();
        wsUpdateConfig.setIsWorksetUpdate();
        wsUpdateConfig.addInputToGroup(0);
        wsUpdateConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        wsUpdateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        wsUpdateConfig.setOutputComparator((TypeComparatorFactory)comparator, 0);
        wsUpdateConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        wsUpdateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
        wsUpdateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        wsUpdateConfig.setDriverComparator((TypeComparatorFactory)comparator, 0);
        wsUpdateConfig.setDriverPairComparator((TypePairComparatorFactory)pairComparator);
        wsUpdateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
        ssTailConfig.setIterationId(1);
        ssTailConfig.setIsSolutionSetUpdate();
        ssTailConfig.setIsWorksetIteration();
        ssTailConfig.addInputToGroup(0);
        ssTailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        ssTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        ssTailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        ssTailConfig.setDriver(CollectorMapDriver.class);
        ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        ssTailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)edges, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)intermediate, (AbstractJobVertex)wsUpdateIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        wsUpdateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)wsUpdateIntermediate, (AbstractJobVertex)ssTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)ssTail, (AbstractJobVertex)fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        vertices.setVertexToShareInstancesWith((AbstractJobVertex)head);
        edges.setVertexToShareInstancesWith((AbstractJobVertex)head);
        intermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        wsUpdateIntermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        ssTail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        fakeTail.setVertexToShareInstancesWith((AbstractJobVertex)ssTail);
        return jobGraph;
    }

    public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations) throws JobGraphDefinitionException {
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{LongValue.class}, new boolean[]{true});
        RecordPairComparatorFactory pairComparator = RecordPairComparatorFactory.get();
        JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
        JobInputVertex vertices = ConnectedComponentsNepheleITCase.createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
        JobInputVertex edges = ConnectedComponentsNepheleITCase.createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
        JobTaskVertex head = ConnectedComponentsNepheleITCase.createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
        JobTaskVertex intermediate = ConnectedComponentsNepheleITCase.createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        JobOutputVertex output = ConnectedComponentsNepheleITCase.createOutput(jobGraph, resultPath, numSubTasks, serializer);
        JobOutputVertex fakeTail = ConnectedComponentsNepheleITCase.createFakeTail(jobGraph, numSubTasks);
        JobOutputVertex sync = ConnectedComponentsNepheleITCase.createSync(jobGraph, numSubTasks, maxIterations);
        JobTaskVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "Solution Set Update", jobGraph, numSubTasks, numSubTasks);
        TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
        ssJoinIntermediateConfig.setIterationId(1);
        ssJoinIntermediateConfig.setIsSolutionSetUpdate();
        ssJoinIntermediateConfig.setIsSolutionSetUpdateWithoutReprobe();
        ssJoinIntermediateConfig.addInputToGroup(0);
        ssJoinIntermediateConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        ssJoinIntermediateConfig.setOutputComparator((TypeComparatorFactory)comparator, 0);
        ssJoinIntermediateConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
        ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        ssJoinIntermediateConfig.setDriverComparator((TypeComparatorFactory)comparator, 0);
        ssJoinIntermediateConfig.setDriverPairComparator((TypePairComparatorFactory)pairComparator);
        ssJoinIntermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(WorksetConnectedComponents.UpdateComponentIdMatch.class));
        JobTaskVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
        wsTailConfig.setIterationId(1);
        wsTailConfig.setIsWorksetIteration();
        wsTailConfig.setIsWorksetUpdate();
        wsTailConfig.addInputToGroup(0);
        wsTailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        wsTailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        wsTailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        wsTailConfig.setDriver(CollectorMapDriver.class);
        wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        wsTailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyMapper.class));
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)edges, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)vertices, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)intermediate, (AbstractJobVertex)ssJoinIntermediate, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)ssJoinIntermediate, (AbstractJobVertex)wsTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)wsTail, (AbstractJobVertex)fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        vertices.setVertexToShareInstancesWith((AbstractJobVertex)head);
        edges.setVertexToShareInstancesWith((AbstractJobVertex)head);
        intermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        ssJoinIntermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        wsTail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        fakeTail.setVertexToShareInstancesWith((AbstractJobVertex)wsTail);
        return jobGraph;
    }

    public static final class DummyMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record rec, Collector<Record> out) {
            out.collect((Object)rec);
        }
    }

    public static final class IdDuplicator
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> out) throws Exception {
            record.setField(1, record.getField(0, LongValue.class));
            out.collect((Object)record);
        }
    }
}

