/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative.nephele;

import eu.stratosphere.api.common.operators.util.UserCodeClassWrapper;
import eu.stratosphere.api.common.operators.util.UserCodeWrapper;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordComparatorFactory;
import eu.stratosphere.api.java.typeutils.runtime.record.RecordSerializerFactory;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.nephele.jobgraph.AbstractJobVertex;
import eu.stratosphere.nephele.jobgraph.DistributionPattern;
import eu.stratosphere.nephele.jobgraph.JobGraph;
import eu.stratosphere.nephele.jobgraph.JobGraphDefinitionException;
import eu.stratosphere.nephele.jobgraph.JobInputVertex;
import eu.stratosphere.nephele.jobgraph.JobOutputVertex;
import eu.stratosphere.nephele.jobgraph.JobTaskVertex;
import eu.stratosphere.pact.runtime.iterative.task.IterationHeadPactTask;
import eu.stratosphere.pact.runtime.iterative.task.IterationTailPactTask;
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;
import eu.stratosphere.pact.runtime.task.CollectorMapDriver;
import eu.stratosphere.pact.runtime.task.DriverStrategy;
import eu.stratosphere.pact.runtime.task.GroupReduceDriver;
import eu.stratosphere.pact.runtime.task.chaining.ChainedCollectorMapDriver;
import eu.stratosphere.pact.runtime.task.util.LocalStrategy;
import eu.stratosphere.pact.runtime.task.util.TaskConfig;
import eu.stratosphere.runtime.io.channels.ChannelType;
import eu.stratosphere.test.iterative.nephele.JobGraphUtils;
import eu.stratosphere.test.recordJobs.kmeans.udfs.CoordVector;
import eu.stratosphere.test.recordJobs.kmeans.udfs.PointInFormat;
import eu.stratosphere.test.recordJobs.kmeans.udfs.PointOutFormat;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.util.Collection;
import java.util.Iterator;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class IterationWithChainingNepheleITCase
extends RecordAPITestBase {
    private static final String INPUT_STRING = "0|%d.25|\n1|%d.25|\n";
    private String dataPath;
    private String resultPath;

    public IterationWithChainingNepheleITCase(Configuration config) {
        super(config);
    }

    protected void preSubmit() throws Exception {
        String initialInput = String.format(INPUT_STRING, 1, 2);
        this.dataPath = this.createTempFile("data_points.txt", initialInput);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        int maxIterations = this.config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
        String result = String.format(INPUT_STRING, 1 + maxIterations, 2 + maxIterations);
        this.compareResultsByLinesInMemory(result, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config = new Configuration();
        config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", 2);
        config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
        return IterationWithChainingNepheleITCase.toParameterList((Configuration[])new Configuration[]{config});
    }

    protected JobGraph getJobGraph() throws Exception {
        int numSubTasks = this.config.getInteger("ChainedMapperNepheleITCase#NoSubtasks", 1);
        int maxIterations = this.config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
        return this.getTestJobGraph(this.dataPath, this.resultPath, numSubTasks, maxIterations);
    }

    private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSubTasks, int maxIterations) throws JobGraphDefinitionException {
        JobGraph jobGraph = new JobGraph("Iteration Tail with Chaining");
        RecordSerializerFactory serializer = RecordSerializerFactory.get();
        RecordComparatorFactory comparator = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class});
        long MEM_PER_CONSUMER = 2L;
        boolean ITERATION_ID = true;
        JobInputVertex input = JobGraphUtils.createInput(new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks, numSubTasks);
        TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
        inputConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        inputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        JobTaskVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks, numSubTasks);
        TaskConfig headConfig = new TaskConfig(head.getConfiguration());
        headConfig.setIterationId(1);
        headConfig.addInputToGroup(0);
        headConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        headConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        headConfig.setOutputComparator((TypeComparatorFactory)comparator, 0);
        TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
        headFinalOutConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
        headConfig.setIterationHeadIndexOfSyncOutput(2);
        headConfig.setDriver(CollectorMapDriver.class);
        headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        headConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyMapper.class));
        headConfig.setBackChannelMemory(0x200000L);
        JobTaskVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks, numSubTasks);
        TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
        tailConfig.setIterationId(1);
        tailConfig.addInputToGroup(0);
        tailConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        tailConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        tailConfig.setDriver(GroupReduceDriver.class);
        tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        tailConfig.setDriverComparator((TypeComparatorFactory)comparator, 0);
        tailConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(DummyReducer.class));
        TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
        chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        chainedMapperConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(IncrementCoordinatesMapper.class));
        chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
        chainedMapperConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        chainedMapperConfig.setOutputSerializer((TypeSerializerFactory)serializer);
        chainedMapperConfig.setIsWorksetUpdate();
        tailConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "Chained ID Mapper");
        JobOutputVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks, numSubTasks);
        TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
        outputConfig.addInputToGroup(0);
        outputConfig.setInputSerializer((TypeSerializerFactory)serializer, 0);
        outputConfig.setStubWrapper((UserCodeWrapper)new UserCodeClassWrapper(PointOutFormat.class));
        outputConfig.setStubParameter("stratosphere.output.file", outputPath);
        JobOutputVertex fakeTail = JobGraphUtils.createFakeOutput(jobGraph, "Fake Tail", numSubTasks, numSubTasks);
        JobOutputVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
        TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
        syncConfig.setNumberOfIterations(maxIterations);
        syncConfig.setIterationId(1);
        JobGraphUtils.connect((AbstractJobVertex)input, (AbstractJobVertex)head, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)tail, ChannelType.IN_MEMORY, DistributionPattern.BIPARTITE);
        tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)output, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)head, (AbstractJobVertex)sync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        JobGraphUtils.connect((AbstractJobVertex)tail, (AbstractJobVertex)fakeTail, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        input.setVertexToShareInstancesWith((AbstractJobVertex)head);
        tail.setVertexToShareInstancesWith((AbstractJobVertex)head);
        output.setVertexToShareInstancesWith((AbstractJobVertex)head);
        sync.setVertexToShareInstancesWith((AbstractJobVertex)head);
        fakeTail.setVertexToShareInstancesWith((AbstractJobVertex)tail);
        return jobGraph;
    }

    public static final class IncrementCoordinatesMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record rec, Collector<Record> out) {
            CoordVector coord = (CoordVector)rec.getField(1, CoordVector.class);
            double[] vector = coord.getCoordinates();
            int i = 0;
            while (i < vector.length) {
                int n = i++;
                vector[n] = vector[n] + 1.0;
            }
            rec.setField(1, (Value)coord);
            out.collect((Object)rec);
        }
    }

    public static final class DummyReducer
    extends ReduceFunction {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> it, Collector<Record> out) {
            while (it.hasNext()) {
                out.collect((Object)it.next());
            }
        }
    }

    public static final class DummyMapper
    extends MapFunction {
        private static final long serialVersionUID = 1L;

        public void map(Record rec, Collector<Record> out) {
            out.collect((Object)rec);
        }
    }
}

