/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.javaApiOperators;

import eu.stratosphere.api.common.operators.Order;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.functions.KeySelector;
import eu.stratosphere.api.java.functions.MapFunction;
import eu.stratosphere.api.java.operators.Operator;
import eu.stratosphere.api.java.operators.ReduceGroupOperator;
import eu.stratosphere.api.java.operators.SingleInputUdfOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class GroupReduceITCase
extends JavaProgramTestBase {
    private static int NUM_PROGRAMS = 13;
    private int curProgId;
    private String resultPath;
    private String expectedResult;

    public GroupReduceITCase(Configuration config) {
        super(config);
        this.curProgId = this.config.getInteger("ProgramId", -1);
    }

    protected void preSubmit() throws Exception {
        this.resultPath = this.getTempDirPath("result");
    }

    protected void testProgram() throws Exception {
        this.expectedResult = GroupReduceProgs.runProgram(this.curProgId, this.resultPath);
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
        for (int i = 1; i <= NUM_PROGRAMS; ++i) {
            Configuration config = new Configuration();
            config.setInteger("ProgramId", i);
            tConfigs.add(config);
        }
        return GroupReduceITCase.toParameterList(tConfigs);
    }

    public static final class IdentityMapper<T>
    extends MapFunction<T, T> {
        public T map(T value) {
            return value;
        }
    }

    @GroupReduceFunction.Combinable
    public static class CustomTypeGroupReduceWithCombine
    extends GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterator<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            while (values.hasNext()) {
                CollectionDataSets.CustomType c = values.next();
                o.myInt = c.myInt;
                o.myLong += c.myLong;
                o.myString = "test" + c.myInt;
            }
            out.collect((Object)o);
        }

        public void reduce(Iterator<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "");
            while (values.hasNext()) {
                CollectionDataSets.CustomType c = values.next();
                o.myInt = c.myInt;
                o.myLong += c.myLong;
                o.myString = c.myString;
            }
            out.collect((Object)o);
        }
    }

    @GroupReduceFunction.Combinable
    public static class Tuple3AllGroupReduceWithCombine
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) {
            Tuple3 o = new Tuple3((Object)0, (Object)0L, (Object)"");
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                Tuple3 tuple3 = o;
                Integer.valueOf((Integer)tuple3.f0 + (Integer)t.f0);
                tuple3.f0 = tuple3.f0;
                tuple3 = o;
                Long.valueOf((Long)tuple3.f1 + (Long)t.f1);
                tuple3.f1 = tuple3.f1;
                o.f2 = o.f2 + "test";
            }
            out.collect((Object)o);
        }

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) {
            int i = 0;
            String s = "";
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                i = (int)((long)i + ((long)((Integer)t.f0).intValue() + (Long)t.f1));
                s = s + (String)t.f2;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)s));
        }
    }

    @GroupReduceFunction.Combinable
    public static class Tuple3GroupReduceWithCombine
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        public void combine(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            Tuple3 o = new Tuple3((Object)0, (Object)0L, (Object)"");
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                Tuple3 tuple3 = o;
                Integer.valueOf((Integer)tuple3.f0 + (Integer)t.f0);
                tuple3.f0 = tuple3.f0;
                o.f1 = t.f1;
                o.f2 = "test" + o.f1;
            }
            out.collect((Object)o);
        }

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, String>> out) throws Exception {
            int i = 0;
            String s = "";
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                i += ((Integer)t.f0).intValue();
                s = (String)t.f2;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)s));
        }
    }

    public static class BCTuple3GroupReduce
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;
        private String f2Replace = "";

        public void open(Configuration config) {
            Collection ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.f2Replace = sum + "";
        }

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            int i = 0;
            long l = 0L;
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                i += ((Integer)t.f0).intValue();
                l = (Long)t.f1;
            }
            out.collect((Object)new Tuple3((Object)i, (Object)l, (Object)this.f2Replace));
        }
    }

    public static class AllAddingCustomTypeGroupReduce
    extends GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            CollectionDataSets.CustomType c = values.next();
            o.myString = "Hello!";
            o.myInt = c.myInt;
            o.myLong = c.myLong;
            while (values.hasNext()) {
                c = values.next();
                o.myInt += c.myInt;
                o.myLong += c.myLong;
            }
            out.collect((Object)o);
        }
    }

    public static class AllAddingTuple3GroupReduce
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            int i = 0;
            long l = 0L;
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                i += ((Integer)t.f0).intValue();
                l += ((Long)t.f1).longValue();
            }
            out.collect((Object)new Tuple3((Object)i, (Object)l, (Object)"Hello World"));
        }
    }

    public static class InputReturningTuple3GroupReduce
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                if ((Integer)t.f0 >= 4) continue;
                t.f2 = "Hi!";
                Tuple3<Integer, Long, String> tuple3 = t;
                Integer.valueOf((Integer)tuple3.f0 + 10);
                tuple3.f0 = tuple3.f0;
                out.collect(t);
                tuple3 = t;
                Integer.valueOf((Integer)tuple3.f0 + 10);
                tuple3.f0 = tuple3.f0;
                t.f2 = "Hi again!";
                out.collect(t);
            }
        }
    }

    public static class CustomTypeGroupReduce
    extends GroupReduceFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<CollectionDataSets.CustomType> values, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType();
            CollectionDataSets.CustomType c = values.next();
            o.myString = "Hello!";
            o.myInt = c.myInt;
            o.myLong = c.myLong;
            while (values.hasNext()) {
                c = values.next();
                o.myLong += c.myLong;
            }
            out.collect((Object)o);
        }
    }

    public static class Tuple5GroupReduce
    extends GroupReduceFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Tuple5<Integer, Long, Integer, String, Long>> values, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
            int i = 0;
            long l = 0L;
            long l2 = 0L;
            while (values.hasNext()) {
                Tuple5<Integer, Long, Integer, String, Long> t = values.next();
                i = (Integer)t.f0;
                l += ((Long)t.f1).longValue();
                l2 = (Long)t.f4;
            }
            out.collect((Object)new Tuple5((Object)i, (Object)l, (Object)0, (Object)"P-)", (Object)l2));
        }
    }

    public static class Tuple3SortedGroupReduce
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            Tuple3<Integer, Long, String> t = values.next();
            int sum = (Integer)t.f0;
            long key = (Long)t.f1;
            String concat = (String)t.f2;
            while (values.hasNext()) {
                t = values.next();
                sum += ((Integer)t.f0).intValue();
                concat = concat + "-" + (String)t.f2;
            }
            out.collect((Object)new Tuple3((Object)sum, (Object)key, (Object)concat));
        }
    }

    public static class Tuple3GroupReduce
    extends GroupReduceFunction<Tuple3<Integer, Long, String>, Tuple2<Integer, Long>> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Tuple3<Integer, Long, String>> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
            int i = 0;
            long l = 0L;
            while (values.hasNext()) {
                Tuple3<Integer, Long, String> t = values.next();
                i += ((Integer)t.f0).intValue();
                l = (Long)t.f1;
            }
            out.collect((Object)new Tuple2((Object)i, (Object)l));
        }
    }

    private static class GroupReduceProgs {
        private GroupReduceProgs() {
        }

        public static String runProgram(int progId, String resultPath) throws Exception {
            switch (progId) {
                case 1: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new Tuple3GroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1\n5,2\n15,3\n34,4\n65,5\n111,6\n";
                }
                case 2: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{4, 0}).reduceGroup((GroupReduceFunction)new Tuple5GroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,0,P-),1\n2,3,0,P-),1\n2,2,0,P-),2\n3,9,0,P-),2\n3,6,0,P-),3\n4,17,0,P-),1\n4,17,0,P-),2\n5,11,0,P-),1\n5,29,0,P-),2\n5,25,0,P-),3\n";
                }
                case 3: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    env.setDegreeOfParallelism(1);
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(2, Order.ASCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,Hello-Hello world\n15,3,Hello world, how are you?-I am fine.-Luke Skywalker\n34,4,Comment#1-Comment#2-Comment#3-Comment#4\n65,5,Comment#5-Comment#6-Comment#7-Comment#8-Comment#9\n111,6,Comment#10-Comment#11-Comment#12-Comment#13-Comment#14-Comment#15\n";
                }
                case 4: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy((KeySelector)new KeySelector<Tuple3<Integer, Long, String>, Long>(){
                        private static final long serialVersionUID = 1L;

                        public Long getKey(Tuple3<Integer, Long, String> in) {
                            return (Long)in.f1;
                        }
                    }).reduceGroup((GroupReduceFunction)new Tuple3GroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1\n5,2\n15,3\n34,4\n65,5\n111,6\n";
                }
                case 5: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).reduceGroup((GroupReduceFunction)new CustomTypeGroupReduce());
                    reduceDs.writeAsText(resultPath);
                    env.execute();
                    return "1,0,Hello!\n2,3,Hello!\n3,12,Hello!\n4,30,Hello!\n5,60,Hello!\n6,105,Hello!\n";
                }
                case 6: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new AllAddingTuple3GroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "231,91,Hello World\n";
                }
                case 7: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    ReduceGroupOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new AllAddingCustomTypeGroupReduce());
                    reduceDs.writeAsText(resultPath);
                    env.execute();
                    return "91,210,Hello!";
                }
                case 8: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    SingleInputUdfOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new BCTuple3GroupReduce()).withBroadcastSet(intDs, "ints");
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,55\n5,2,55\n15,3,55\n34,4,55\n65,5,55\n111,6,55\n";
                }
                case 9: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new InputReturningTuple3GroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "11,1,Hi!\n21,1,Hi again!\n12,2,Hi!\n22,2,Hi again!\n13,2,Hi!\n23,2,Hi again!\n";
                }
                case 10: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).reduceGroup((GroupReduceFunction)new CustomTypeGroupReduceWithCombine());
                    reduceDs.writeAsText(resultPath);
                    env.execute();
                    return "1,0,test1\n2,3,test2\n3,12,test3\n4,30,test4\n5,60,test5\n6,105,test6\n";
                }
                case 11: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    env.setDegreeOfParallelism(2);
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{1}).reduceGroup((GroupReduceFunction)new Tuple3GroupReduceWithCombine());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,test1\n5,test2\n15,test3\n34,test4\n65,test5\n111,test6\n";
                }
                case 12: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    Operator ds = CollectionDataSets.get3TupleDataSet(env).map(new IdentityMapper()).setParallelism(4);
                    Configuration cfg = new Configuration();
                    cfg.setString("INPUT_SHIP_STRATEGY", "SHIP_REPARTITION");
                    SingleInputUdfOperator reduceDs = ds.reduceGroup((GroupReduceFunction)new Tuple3AllGroupReduceWithCombine()).withParameters(cfg);
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "322,testtesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttesttest\n";
                }
                case 13: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    env.setDegreeOfParallelism(1);
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    ReduceGroupOperator reduceDs = ds.groupBy(new int[]{1}).sortGroup(2, Order.DESCENDING).reduceGroup((GroupReduceFunction)new Tuple3SortedGroupReduce());
                    reduceDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n5,2,Hello world-Hello\n15,3,Luke Skywalker-I am fine.-Hello world, how are you?\n34,4,Comment#4-Comment#3-Comment#2-Comment#1\n65,5,Comment#9-Comment#8-Comment#7-Comment#6-Comment#5\n111,6,Comment#15-Comment#14-Comment#13-Comment#12-Comment#11-Comment#10\n";
                }
            }
            throw new IllegalArgumentException("Invalid program id");
        }
    }
}

