/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.iterative;

import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.ExecutionEnvironment;
import eu.stratosphere.api.java.IterativeDataSet;
import eu.stratosphere.api.java.functions.GroupReduceFunction;
import eu.stratosphere.api.java.io.LocalCollectionOutputFormat;
import eu.stratosphere.api.java.operators.DataSource;
import eu.stratosphere.api.java.operators.SingleInputUdfOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.test.util.JavaProgramTestBase;
import eu.stratosphere.util.Collector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import junit.framework.Assert;

public class BulkIterationWithAllReducerITCase
extends JavaProgramTestBase {
    protected void testProgram() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setDegreeOfParallelism(1);
        DataSource data = env.fromElements((Object[])new Integer[]{1, 2, 3, 4, 5, 6, 7, 8});
        IterativeDataSet iteration = data.iterate(10);
        SingleInputUdfOperator result = data.reduceGroup((GroupReduceFunction)new PickOneAllReduce()).withBroadcastSet((DataSet)iteration, "bc");
        ArrayList resultList = new ArrayList();
        iteration.closeWith((DataSet)result).output((OutputFormat)new LocalCollectionOutputFormat(resultList));
        env.execute();
        Assert.assertEquals((int)8, (int)((Integer)resultList.get(0)));
    }

    public static class PickOneAllReduce
    extends GroupReduceFunction<Integer, Integer> {
        private Integer bcValue;

        public void open(Configuration parameters) {
            Collection bc = this.getRuntimeContext().getBroadcastVariable("bc");
            this.bcValue = bc.isEmpty() ? null : (Integer)bc.iterator().next();
        }

        public void reduce(Iterator<Integer> records, Collector<Integer> out) {
            if (this.bcValue == null) {
                return;
            }
            int x = this.bcValue;
            while (records.hasNext()) {
                int y = records.next();
                if (y <= x) continue;
                out.collect((Object)y);
                return;
            }
            out.collect((Object)this.bcValue);
        }
    }
}

