/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.javaApiOperators;

import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.functions.KeySelector;
import eu.stratosphere.api.java.operators.CoGroupOperator;
import eu.stratosphere.api.java.operators.TwoInputUdfOperator;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.tuple.Tuple5;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.javaApiOperators.util.CollectionDataSets;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CoGroupITCase
extends JavaProgramTestBase {
    private static int NUM_PROGRAMS = 7;
    private int curProgId;
    private String resultPath;
    private String expectedResult;

    public CoGroupITCase(Configuration config) {
        super(config);
        this.curProgId = this.config.getInteger("ProgramId", -1);
    }

    protected void preSubmit() throws Exception {
        this.resultPath = this.getTempDirPath("result");
    }

    protected void testProgram() throws Exception {
        this.expectedResult = CoGroupProgs.runProgram(this.curProgId, this.resultPath);
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(this.expectedResult, this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws FileNotFoundException, IOException {
        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
        for (int i = 1; i <= NUM_PROGRAMS; ++i) {
            Configuration config = new Configuration();
            config.setInteger("ProgramId", i);
            tConfigs.add(config);
        }
        return CoGroupITCase.toParameterList(tConfigs);
    }

    public static class Tuple5CoGroupBC
    extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private int broadcast = 42;

        public void open(Configuration config) {
            Collection ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.broadcast = sum;
        }

        public void coGroup(Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple3<Integer, Integer, Integer>> out) throws Exception {
            Tuple5<Integer, Long, Integer, String, Long> element;
            int sum = 0;
            int id = 0;
            while (first.hasNext()) {
                element = first.next();
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            while (second.hasNext()) {
                element = second.next();
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            out.collect((Object)new Tuple3((Object)id, (Object)sum, (Object)this.broadcast));
        }
    }

    public static class Tuple5ReturnRight
    extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple5<Integer, Long, Integer, String, Long>> out) throws Exception {
            while (second.hasNext()) {
                Tuple5<Integer, Long, Integer, String, Long> element = second.next();
                if ((Integer)element.f0 >= 4) continue;
                out.collect(element);
            }
        }
    }

    public static class Tuple3ReturnLeft
    extends CoGroupFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Tuple3<Integer, Long, String>> first, Iterator<Tuple3<Integer, Long, String>> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            while (first.hasNext()) {
                Tuple3<Integer, Long, String> element = first.next();
                if ((Integer)element.f0 >= 6) continue;
                out.collect(element);
            }
        }
    }

    public static class MixedCoGroup2
    extends CoGroupFunction<CollectionDataSets.CustomType, Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<CollectionDataSets.CustomType> first, Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<CollectionDataSets.CustomType> out) throws Exception {
            Tuple5<Integer, Long, Integer, String, Long> element;
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "test");
            while (first.hasNext()) {
                element = first.next();
                o.myInt = element.myInt;
                o.myLong += element.myLong;
            }
            while (second.hasNext()) {
                element = second.next();
                o.myInt = (Integer)element.f2;
                o.myLong += (long)((Integer)element.f0).intValue();
            }
            out.collect((Object)o);
        }
    }

    public static class MixedCoGroup
    extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, Iterator<CollectionDataSets.CustomType> second, Collector<Tuple3<Integer, Long, String>> out) throws Exception {
            Object element;
            long sum = 0L;
            int id = 0;
            while (first.hasNext()) {
                element = first.next();
                sum += (long)((Integer)element.f0).intValue();
                id = (Integer)element.f2;
            }
            while (second.hasNext()) {
                element = second.next();
                id = element.myInt;
                sum += element.myLong;
            }
            out.collect((Object)new Tuple3((Object)id, (Object)sum, (Object)"test"));
        }
    }

    public static class CustomTypeCoGroup
    extends CoGroupFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<CollectionDataSets.CustomType> first, Iterator<CollectionDataSets.CustomType> second, Collector<CollectionDataSets.CustomType> out) throws Exception {
            CollectionDataSets.CustomType element;
            CollectionDataSets.CustomType o = new CollectionDataSets.CustomType(0, 0L, "test");
            while (first.hasNext()) {
                element = first.next();
                o.myInt = element.myInt;
                o.myLong += element.myLong;
            }
            while (second.hasNext()) {
                element = second.next();
                o.myInt = element.myInt;
                o.myLong += element.myLong;
            }
            out.collect((Object)o);
        }
    }

    public static class Tuple5CoGroup
    extends CoGroupFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterator<Tuple5<Integer, Long, Integer, String, Long>> first, Iterator<Tuple5<Integer, Long, Integer, String, Long>> second, Collector<Tuple2<Integer, Integer>> out) throws Exception {
            Tuple5<Integer, Long, Integer, String, Long> element;
            int sum = 0;
            int id = 0;
            while (first.hasNext()) {
                element = first.next();
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            while (second.hasNext()) {
                element = second.next();
                sum += ((Integer)element.f2).intValue();
                id = (Integer)element.f0;
            }
            out.collect((Object)new Tuple2((Object)id, (Object)sum));
        }
    }

    private static class CoGroupProgs {
        private CoGroupProgs() {
        }

        public static String runProgram(int progId, String resultPath) throws Exception {
            switch (progId) {
                case 1: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
                    CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5CoGroup());
                    coGroupDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,0\n2,6\n3,24\n4,60\n5,120\n";
                }
                case 2: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
                    DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
                    CoGroupOperator coGroupDs = ds.coGroup(ds2).where((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).equalTo((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).with((CoGroupFunction)new CustomTypeCoGroup());
                    coGroupDs.writeAsText(resultPath);
                    env.execute();
                    return "1,0,test\n2,6,test\n3,24,test\n4,60,test\n5,120,test\n6,210,test\n";
                }
                case 3: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
                    DataSet<Tuple3<Integer, Long, String>> ds2 = CollectionDataSets.get3TupleDataSet(env);
                    CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple3ReturnLeft());
                    coGroupDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,Hi\n2,2,Hello\n3,2,Hello world\n4,3,Hello world, how are you?\n5,3,I am fine.\n";
                }
                case 4: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
                    CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5ReturnRight());
                    coGroupDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,3,2,Hallo Welt wie,1\n3,4,3,Hallo Welt wie gehts?,2\n3,5,4,ABC,2\n3,6,5,BCD,3\n";
                }
                case 5: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
                    TwoInputUdfOperator coGroupDs = ds.coGroup(ds2).where(new int[]{0}).equalTo(new int[]{0}).with((CoGroupFunction)new Tuple5CoGroupBC()).withBroadcastSet(intDs, "ints");
                    coGroupDs.writeAsCsv(resultPath);
                    env.execute();
                    return "1,0,55\n2,6,55\n3,24,55\n4,60,55\n5,120,55\n";
                }
                case 6: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
                    CoGroupOperator coGroupDs = ds.coGroup(ds2).where(new int[]{2}).equalTo((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).with((CoGroupFunction)new MixedCoGroup());
                    coGroupDs.writeAsCsv(resultPath);
                    env.execute();
                    return "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n";
                }
                case 7: {
                    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
                    DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
                    DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getCustomTypeDataSet(env);
                    CoGroupOperator coGroupDs = ds2.coGroup(ds).where((KeySelector)new KeySelector<CollectionDataSets.CustomType, Integer>(){
                        private static final long serialVersionUID = 1L;

                        public Integer getKey(CollectionDataSets.CustomType in) {
                            return in.myInt;
                        }
                    }).equalTo(new int[]{2}).with((CoGroupFunction)new MixedCoGroup2());
                    coGroupDs.writeAsText(resultPath);
                    env.execute();
                    return "0,1,test\n1,2,test\n2,5,test\n3,15,test\n4,33,test\n5,63,test\n6,109,test\n7,4,test\n8,4,test\n9,4,test\n10,5,test\n11,5,test\n12,5,test\n13,5,test\n14,5,test\n";
                }
            }
            throw new IllegalArgumentException("Invalid program id");
        }
    }
}

