/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;
import org.junit.Assert;

public class IterationWithAllReducerITCase
extends RecordAPITestBase {
    private static final String INPUT = "1\n1\n1\n1\n1\n1\n1\n1\n";
    private static final String EXPECTED = "1\n";
    protected String dataPath;
    protected String resultPath;

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(EXPECTED, this.resultPath);
    }

    protected Plan getTestJob() {
        Plan plan = IterationWithAllReducerITCase.getTestPlanPlan(4, this.dataPath, this.resultPath);
        return plan;
    }

    private static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
        FileDataSource initialInput = new FileDataSource(TextInputFormat.class, input, "input");
        BulkIteration iteration = new BulkIteration("Loop");
        iteration.setInput((Operator)initialInput);
        iteration.setMaximumNumberOfIterations(5);
        Assert.assertTrue((iteration.getMaximumNumberOfIterations() > 1 ? 1 : 0) != 0);
        ReduceOperator sumReduce = ReduceOperator.builder((ReduceFunction)new PickOneReducer()).input(new Operator[]{iteration.getPartialSolution()}).name("Compute sum (Reduce)").build();
        iteration.setNextPartialSolution((Operator)sumReduce);
        FileDataSink finalResult = new FileDataSink(CsvOutputFormat.class, output, (Operator)iteration, "Output");
        ((CsvOutputFormat.ConfigBuilder)((CsvOutputFormat.ConfigBuilder)CsvOutputFormat.configureRecordFormat((FileDataSink)finalResult).recordDelimiter('\n')).fieldDelimiter(' ')).field(StringValue.class, 0);
        Plan plan = new Plan((GenericDataSink)finalResult, "Iteration with AllReducer (keyless Reducer)");
        plan.setDefaultParallelism(numSubTasks);
        Assert.assertTrue((plan.getDefaultParallelism() > 1 ? 1 : 0) != 0);
        return plan;
    }

    public static final class PickOneReducer
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> it, Collector<Record> out) {
            out.collect((Object)it.next());
        }
    }
}

