/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.javaApiOperators;

import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.KeySelector;
import eu.stratosphere.api.java.functions.ReduceFunction;
import eu.stratosphere.api.java.operators.ReduceOperator;
import eu.stratosphere.api.java.operators.SingleInputUdfOperator;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
import eu.stratosphere.test.util.JavaProgramTestBase;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ReduceITCase
extends JavaProgramTestBase {
    private static int NUM_PROGRAMS = 8;
    private int curProgId;
    private String resultPath;
    private String expectedResult;

    public ReduceITCase(Configuration config) {
        super(config);
        this.curProgId = this.config.getInteger("ProgramId", -1);
    }

    protected void preSubmit() throws Exception {
        this.resultPath = this.getTempDirPath("result");
    }

    protected void testProgram() throws Exception {
        this.expectedResult = ReduceProgs.runProgram(this.curProgId, this.resultPath);
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
        for (int i = 1; i <= NUM_PROGRAMS; ++i) {
            Configuration config = new Configuration();
            config.setInteger("ProgramId", i);
            tConfigs.add(config);
        }
        return ReduceITCase.toParameterList(tConfigs);
    }

    public static class BCTuple3Reduce
    extends ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();
        private String f2Replace = "";

        public void open(Configuration config) {
            Collection ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.f2Replace = sum + "";
        }

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, (Object)this.f2Replace);
            return this.out;
        }
    }

    public static class AllAddingCustomTypeReduce
    extends ReduceFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;
        private final CollectionDataSets.CustomType out = new CollectionDataSets.CustomType();

        public CollectionDataSets.CustomType reduce(CollectionDataSets.CustomType in1, CollectionDataSets.CustomType in2) throws Exception {
            this.out.myInt = in1.myInt + in2.myInt;
            this.out.myLong = in1.myLong + in2.myLong;
            this.out.myString = "Hello!";
            return this.out;
        }
    }

    public static class AllAddingTuple3Reduce
    extends ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), (Object)((Long)in1.f1 + (Long)in2.f1), (Object)"Hello World");
            return this.out;
        }
    }

    public static class InputReturningTuple3Reduce
    extends ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            in2.f0 = (Integer)in1.f0 + (Integer)in2.f0;
            in2.f2 = "Hi again!";
            return in2;
        }
    }

    public static class CustomTypeReduce
    extends ReduceFunction<CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;
        private final CollectionDataSets.CustomType out = new CollectionDataSets.CustomType();

        public CollectionDataSets.CustomType reduce(CollectionDataSets.CustomType in1, CollectionDataSets.CustomType in2) throws Exception {
            this.out.myInt = in1.myInt;
            this.out.myLong = in1.myLong + in2.myLong;
            this.out.myString = "Hello!";
            return this.out;
        }
    }

    public static class Tuple5Reduce
    extends ReduceFunction<Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;
        private final Tuple5<Integer, Long, Integer, String, Long> out = new Tuple5();

        public Tuple5<Integer, Long, Integer, String, Long> reduce(Tuple5<Integer, Long, Integer, String, Long> in1, Tuple5<Integer, Long, Integer, String, Long> in2) throws Exception {
            this.out.setFields(in1.f0, (Object)((Long)in1.f1 + (Long)in2.f1), (Object)0, (Object)"P-)", in1.f4);
            return this.out;
        }
    }

    public static class Tuple3Reduce
    extends ReduceFunction<Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private final Tuple3<Integer, Long, String> out = new Tuple3();
        private final String f2Replace;

        public Tuple3Reduce() {
            this.f2Replace = null;
        }

        public Tuple3Reduce(String f2Replace) {
            this.f2Replace = f2Replace;
        }

        public Tuple3<Integer, Long, String> reduce(Tuple3<Integer, Long, String> in1, Tuple3<Integer, Long, String> in2) throws Exception {
            if (this.f2Replace == null) {
                this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, in1.f2);
            } else {
                this.out.setFields((Object)((Integer)in1.f0 + (Integer)in2.f0), in1.f1, (Object)this.f2Replace);
            }
            return this.out;
        }
    }

    private static class ReduceProgs {
        private ReduceProgs() {
        }

        public static String runProgram(int progId, String resultPath) throws Exception {
            switch (progId) {
                case 1: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduce((ReduceFunction)new Tuple3Reduce("B-)"));
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,B-)\n15,3,B-)\n34,4,B-)\n65,5,B-)\n111,6,B-)\n";
                }
                case 2: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    ReduceOperator reduceDs = ds.groupBy(new int[]{4, 0}).reduce((ReduceFunction)new Tuple5Reduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,0,Hallo,1\n2,3,2,Hallo Welt wie,1\n2,2,1,Hallo Welt,2\n3,9,0,P-),2\n3,6,5,BCD,3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,10,GHI,1\n5,29,0,P-),2\n5,25,0,P-),3\n";
                }
                case 3: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Long>(){
                        private static final long serialVersionUID = 1L;

                        public Long getKey(Tuple3<Integer, Long, String> in) {
                            return (Long)in.f1;
                        }
                    }).reduce((ReduceFunction)new Tuple3Reduce("B-)"));
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,B-)\n15,3,B-)\n34,4,B-)\n65,5,B-)\n111,6,B-)\n";
                }
                case 4: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    ReduceOperator reduceDs = ds.groupBy((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).reduce((ReduceFunction)new CustomTypeReduce());
                    reduceDs.writeAsText(resultPath);
                    env.execute();
                    return "1,0,Hi\n2,3,Hello!\n3,12,Hello!\n4,30,Hello!\n5,60,Hello!\n6,105,Hello!\n";
                }
                case 5: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceOperator reduceDs = ds.reduce((ReduceFunction)new AllAddingTuple3Reduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "231,91,Hello World\n";
                }
                case 6: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    ReduceOperator reduceDs = ds.reduce((ReduceFunction)new AllAddingCustomTypeReduce());
                    reduceDs.writeAsText(resultPath);
                    env.execute();
                    return "91,210,Hello!";
                }
                case 7: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    SingleInputUdfOperator reduceDs = ds.groupBy(new int[]{1}).reduce((ReduceFunction)new BCTuple3Reduce()).withBroadcastSet(intDs, "ints");
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,55\n15,3,55\n34,4,55\n65,5,55\n111,6,55\n";
                }
                case 8: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceOperator reduceDs = ds.groupBy(new int[]{1}).reduce((ReduceFunction)new InputReturningTuple3Reduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,Hi again!\n15,3,Hi again!\n34,4,Hi again!\n65,5,Hi again!\n111,6,Hi again!\n";
                }
            }
            throw new IllegalArgumentException("Invalid program id");
        }
    }
}

