/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele.customdanglingpagerank;

import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationIntermediatePactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.BuildSecondCachedMatchDriver;
import eu.stratosphere.pact.runtime.task.CoGroupDriver;
import eu.stratosphere.pact.runtime.task.CollectorMapDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDotProductCoGroup;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDotProductMatch;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomCompensatingMap;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomImprovedAdjacencyListInputFormat;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomImprovedDanglingPageRankInputFormat;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.CustomPageWithRankOutFormat;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyList;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListComparatorFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithAdjacencyListSerializerFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRank;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDangling;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingComparatorFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankAndDanglingSerializerFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankComparatorFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankDanglingToVertexWithRankPairComparatorFactory;
import eu.stratosphere.test.iterative.nephele.customdanglingpagerank.types.VertexWithRankSerializerFactory;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.DiffL1NormConvergenceCriterion;
import eu.stratosphere.test.iterative.nephele.danglingpagerank.PageRankStatsAggregator;

public class CustomCompensatableDanglingPageRank {
    private static final int NUM_FILE_HANDLES_PER_SORT = 64;
    private static final float SORT_SPILL_THRESHOLD = 0.85f;
    private static final int ITERATION_ID = 1;
    private static TypeSerializerFactory<VertexWithRank> vertexWithRankSerializer = new VertexWithRankSerializerFactory();
    private static TypeSerializerFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingSerializer = new VertexWithRankAndDanglingSerializerFactory();
    private static TypeSerializerFactory<VertexWithAdjacencyList> vertexWithAdjacencyListSerializer = new VertexWithAdjacencyListSerializerFactory();
    private static TypeComparatorFactory<VertexWithRank> vertexWithRankComparator = new VertexWithRankComparatorFactory();
    private static TypeComparatorFactory<VertexWithRankAndDangling> vertexWithRankAndDanglingComparator = new VertexWithRankAndDanglingComparatorFactory();
    private static TypeComparatorFactory<VertexWithAdjacencyList> vertexWithAdjacencyListComparator = new VertexWithAdjacencyListComparatorFactory();
    private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithAdjacencyList> matchComparator = new VertexWithRankDanglingToVertexWithAdjacencyListPairComparatorFactory();
    private static TypePairComparatorFactory<VertexWithRankAndDangling, VertexWithRank> coGroupComparator = new VertexWithRankDanglingToVertexWithRankPairComparatorFactory();

    public static JobGraph getJobGraph(String[] args) throws Exception {
        int degreeOfParallelism;
        int numSubTasksPerInstance = degreeOfParallelism = 2;
        String pageWithRankInputPath = "";
        String adjacencyListInputPath = "";
        String outputPath = Path.constructTestURI((String)"stratosphere_iterations");
        int minorConsumer = 2;
        int matchMemory = 5;
        int coGroupSortMemory = 5;
        int numIterations = 25;
        long numVertices = 5L;
        long numDanglingVertices = 1L;
        String failingWorkers = "1";
        int failingIteration = 2;
        double messageLoss = 0.75;
        if (args.length >= 15) {
            degreeOfParallelism = Integer.parseInt(args[0]);
            numSubTasksPerInstance = Integer.parseInt(args[1]);
            pageWithRankInputPath = args[2];
            adjacencyListInputPath = args[3];
            outputPath = args[4];
            minorConsumer = Integer.parseInt(args[6]);
            matchMemory = Integer.parseInt(args[7]);
            coGroupSortMemory = Integer.parseInt(args[8]);
            numIterations = Integer.parseInt(args[9]);
            numVertices = Long.parseLong(args[10]);
            numDanglingVertices = Long.parseLong(args[11]);
            failingWorkers = args[12];
            failingIteration = Integer.parseInt(args[13]);
            messageLoss = Double.parseDouble(args[14]);
        }
        JobGraph jobGraph = new JobGraph("CompensatableDanglingPageRank");
        JobInputVertex pageWithRankInput = JobGraphUtils.createInput(new CustomImprovedDanglingPageRankInputFormat(), pageWithRankInputPath, "DanglingPageWithRankInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig pageWithRankInputConfig = new TaskConfig(pageWithRankInput.getConfiguration());
        pageWithRankInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        pageWithRankInputConfig.setOutputComparator(vertexWithRankAndDanglingComparator, 0);
        pageWithRankInputConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
        pageWithRankInputConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        JobInputVertex adjacencyListInput = JobGraphUtils.createInput(new CustomImprovedAdjacencyListInputFormat(), adjacencyListInputPath, "AdjancencyListInput", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig adjacencyListInputConfig = new TaskConfig(adjacencyListInput.getConfiguration());
        adjacencyListInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        adjacencyListInputConfig.setOutputSerializer(vertexWithAdjacencyListSerializer);
        adjacencyListInputConfig.setOutputComparator(vertexWithAdjacencyListComparator, 0);
        JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "IterationHead", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setIterationId(1);
        headConfig.addInputToGroup(0);
        headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        headConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
        headConfig.setInputComparator(vertexWithRankAndDanglingComparator, 0);
        headConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
        headConfig.setMemoryInput(0, (long)minorConsumer * 0x100000L);
        headConfig.setFilehandlesInput(0, 64);
        headConfig.setSpillingThresholdInput(0, 0.85f);
        headConfig.setBackChannelMemory((long)minorConsumer * 0x100000L);
        headConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
        headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
        headFinalOutConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
        headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(3);
        headConfig.setNumberOfIterations(numIterations);
        headConfig.setDriver(CollectorMapDriver.class);
        headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        headConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CustomCompensatingMap.class));
        headConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        headConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        headConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        headConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        headConfig.addIterationAggregator("pagerank.aggregator", PageRankStatsAggregator.class);
        JobTaskVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class, "IterationIntermediate", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
        intermediateConfig.setIterationId(1);
        intermediateConfig.setDriver(BuildSecondCachedMatchDriver.class);
        intermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        intermediateConfig.setMemoryDriver((long)matchMemory * 0x100000L);
        intermediateConfig.addInputToGroup(0);
        intermediateConfig.addInputToGroup(1);
        intermediateConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
        intermediateConfig.setInputSerializer(vertexWithAdjacencyListSerializer, 1);
        intermediateConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
        intermediateConfig.setDriverComparator(vertexWithAdjacencyListComparator, 1);
        intermediateConfig.setDriverPairComparator(matchComparator);
        intermediateConfig.setOutputSerializer(vertexWithRankSerializer);
        intermediateConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        intermediateConfig.setOutputComparator(vertexWithRankComparator, 0);
        intermediateConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CustomCompensatableDotProductMatch.class));
        intermediateConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        intermediateConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        intermediateConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        intermediateConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
        tailConfig.setIterationId(1);
        tailConfig.setIsWorksetUpdate();
        tailConfig.setDriver(CoGroupDriver.class);
        tailConfig.setDriverStrategy(DriverStrategy.CO_GROUP);
        tailConfig.addInputToGroup(0);
        tailConfig.addInputToGroup(1);
        tailConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
        tailConfig.setInputSerializer(vertexWithRankSerializer, 1);
        tailConfig.setDriverComparator(vertexWithRankAndDanglingComparator, 0);
        tailConfig.setDriverComparator(vertexWithRankComparator, 1);
        tailConfig.setDriverPairComparator(coGroupComparator);
        tailConfig.setInputAsynchronouslyMaterialized(0, true);
        tailConfig.setInputMaterializationMemory(0, (long)minorConsumer * 0x100000L);
        tailConfig.setInputLocalStrategy(1, LocalStrategy.SORT);
        tailConfig.setInputComparator(vertexWithRankComparator, 1);
        tailConfig.setMemoryInput(1, (long)coGroupSortMemory * 0x100000L);
        tailConfig.setFilehandlesInput(1, 64);
        tailConfig.setSpillingThresholdInput(1, 0.85f);
        tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        tailConfig.setOutputSerializer(vertexWithRankAndDanglingSerializer);
        tailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CustomCompensatableDotProductCoGroup.class));
        tailConfig.setStubParameter("pageRank.numVertices", String.valueOf(numVertices));
        tailConfig.setStubParameter("pageRank.numDanglingVertices", String.valueOf(numDanglingVertices));
        tailConfig.setStubParameter("compensation.failingWorker", failingWorkers);
        tailConfig.setStubParameter("compensation.failingIteration", String.valueOf(failingIteration));
        tailConfig.setStubParameter("compensation.messageLoss", String.valueOf(messageLoss));
        JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "FinalOutput", degreeOfParallelism, numSubTasksPerInstance);
        TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
        outputConfig.addInputToGroup(0);
        outputConfig.setInputSerializer(vertexWithRankAndDanglingSerializer, 0);
        outputConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(CustomPageWithRankOutFormat.class));
        outputConfig.setStubParameter("stratosphere.output.file", outputPath);
        JobOutputVertex fakeTailOutput = JobGraphUtils.createFakeOutput(jobGraph, "FakeTailOutput", degreeOfParallelism, numSubTasksPerInstance);
        JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, degreeOfParallelism);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setNumberOfIterations(numIterations);
        syncConfig.addIterationAggregator("pagerank.aggregator", PageRankStatsAggregator.class);
        syncConfig.setConvergenceCriterion("pagerank.aggregator", DiffL1NormConvergenceCriterion.class);
        syncConfig.setIterationId(1);
        JobGraphUtils.connect((AbstractJobVertex)pageWithRankInput, (AbstractJobVertex)head, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)intermediate, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        JobGraphUtils.connect((AbstractJobVertex)adjacencyListInput, (AbstractJobVertex)intermediate, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)tail, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)intermediate, (AbstractJobVertex)tail, ChannelType.NETWORK, DistributionPattern.BIPARTITE);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(1, degreeOfParallelism);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)tail, (AbstractJobVertex)fakeTailOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        fakeTailOutput.setVertexToShareInstancesWith((AbstractJobVertex)tail);
        tail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        pageWithRankInput.setVertexToShareInstancesWith((AbstractJobVertex)head);
        adjacencyListInput.setVertexToShareInstancesWith((AbstractJobVertex)head);
        intermediate.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        return jobGraph;
    }
}

