/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.test.accumulators;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.stratosphere.api.common.JobExecutionResult;
import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.accumulators.Accumulator;
import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
import eu.stratosphere.api.common.accumulators.DoubleCounter;
import eu.stratosphere.api.common.accumulators.Histogram;
import eu.stratosphere.api.common.accumulators.IntCounter;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.io.CsvOutputFormat;
import eu.stratosphere.api.java.record.io.TextInputFormat;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.io.IOReadableWritable;
import eu.stratosphere.core.io.StringRecord;
import eu.stratosphere.nephele.util.SerializableHashSet;
import eu.stratosphere.test.util.RecordAPITestBase;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import eu.stratosphere.util.SimpleStringUtils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AccumulatorITCase
extends RecordAPITestBase {
    private static final String INPUT = "one\ntwo two\nthree three three\n";
    private static final String EXPECTED = "one 1\ntwo 2\nthree 3\n";
    private static final int NUM_SUBTASKS = 2;
    protected String dataPath;
    protected String resultPath;

    public AccumulatorITCase(Configuration config) {
        super(config);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = this.createTempFile("datapoints.txt", INPUT);
        this.resultPath = this.getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        this.compareResultsByLinesInMemory(EXPECTED, this.resultPath);
        System.out.println("Accumulator results:");
        JobExecutionResult res = this.getJobExecutionResult();
        System.out.println(AccumulatorHelper.getResultsFormated((Map)res.getAllAccumulatorResults()));
        Assert.assertEquals((Object)new Integer(3), (Object)((Integer)res.getAccumulatorResult("num-lines")));
        Assert.assertEquals((Object)new Double(2.0), (Object)((Double)res.getAccumulatorResult("open-close-counter")));
        HashMap dist = Maps.newHashMap();
        dist.put(1, 1);
        dist.put(2, 2);
        dist.put(3, 3);
        Assert.assertEquals((Object)dist, (Object)res.getAccumulatorResult("words-per-line"));
        HashSet distinctWords = Sets.newHashSet();
        distinctWords.add(new StringRecord("one"));
        distinctWords.add(new StringRecord("two"));
        distinctWords.add(new StringRecord("three"));
        Assert.assertEquals((Object)distinctWords, (Object)res.getAccumulatorResult("distinct-words"));
    }

    protected Plan getTestJob() {
        Plan plan = AccumulatorITCase.getTestPlanPlan(this.config.getInteger("IterationAllReducer#NoSubtasks", 1), this.dataPath, this.resultPath);
        return plan;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration config1 = new Configuration();
        config1.setInteger("IterationAllReducer#NoSubtasks", 2);
        return AccumulatorITCase.toParameterList((Configuration[])new Configuration[]{config1});
    }

    static Plan getTestPlanPlan(int numSubTasks, String input, String output) {
        FileDataSource source = new FileDataSource((FileInputFormat)new TextInputFormat(), input, "Input Lines");
        source.setParameter("textformat.charset", "ASCII");
        MapOperator mapper = MapOperator.builder((MapFunction)new TokenizeLine()).input(new Operator[]{source}).name("Tokenize Lines").build();
        ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, (int)0).input(new Operator[]{mapper}).name("Count Words").build();
        FileDataSink out = new FileDataSink((FileOutputFormat)new CsvOutputFormat("\n", " ", new Class[]{StringValue.class, IntValue.class}), output, (Operator)reducer, "Word Counts");
        Plan plan = new Plan((GenericDataSink)out, "WordCount Example");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public static class SetAccumulator<T extends IOReadableWritable>
    implements Accumulator<T, Set<T>> {
        private static final long serialVersionUID = 1L;
        private SerializableHashSet<T> set = new SerializableHashSet();

        public void add(T value) {
            this.set.add(value);
        }

        public Set<T> getLocalValue() {
            return this.set;
        }

        public void resetLocal() {
            this.set.clear();
        }

        public void merge(Accumulator<T, Set<T>> other) {
            this.set.addAll((Collection)((SetAccumulator)other).getLocalValue());
        }

        public void write(DataOutput out) throws IOException {
            this.set.write(out);
        }

        public void read(DataInput in) throws IOException {
            this.set.read(in);
        }
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static class CountWords
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final IntValue cnt = new IntValue();
        private IntCounter reduceCalls = null;
        private IntCounter combineCalls = null;

        public void open(Configuration parameters) throws Exception {
            this.reduceCalls = this.getRuntimeContext().getIntCounter("reduce-calls");
            this.combineCalls = this.getRuntimeContext().getIntCounter("combine-calls");
        }

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            this.reduceCalls.add(Integer.valueOf(1));
            this.reduceInternal(records, out);
        }

        public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
            this.combineCalls.add(Integer.valueOf(1));
            this.reduceInternal(records, out);
        }

        private void reduceInternal(Iterator<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            while (records.hasNext()) {
                element = records.next();
                IntValue i = (IntValue)element.getField(1, IntValue.class);
                sum += i.getValue();
            }
            this.cnt.setValue(sum);
            element.setField(1, (Value)this.cnt);
            out.collect((Object)element);
        }
    }

    public static class TokenizeLine
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;
        private final Record outputRecord = new Record();
        private StringValue word;
        private final IntValue one = new IntValue(1);
        private final SimpleStringUtils.WhitespaceTokenizer tokenizer = new SimpleStringUtils.WhitespaceTokenizer();
        IntCounter cntNumLines = null;
        Histogram wordsPerLineDistribution = null;
        DoubleCounter openCloseCounter = new DoubleCounter();
        private SetAccumulator<StringRecord> distinctWords = null;

        public void open(Configuration parameters) throws Exception {
            this.cntNumLines = this.getRuntimeContext().getIntCounter("num-lines");
            this.wordsPerLineDistribution = this.getRuntimeContext().getHistogram("words-per-line");
            this.getRuntimeContext().addAccumulator("open-close-counter", (Accumulator)this.openCloseCounter);
            this.distinctWords = new SetAccumulator();
            this.getRuntimeContext().addAccumulator("distinct-words", this.distinctWords);
            IntCounter simpleCounter = this.getRuntimeContext().getIntCounter("simple-counter");
            simpleCounter.add(Integer.valueOf(1));
            Assert.assertEquals((long)simpleCounter.getLocalValue().intValue(), (long)1L);
            IntCounter simpleCounter2 = this.getRuntimeContext().getIntCounter("simple-counter");
            Assert.assertEquals((Object)simpleCounter.getLocalValue(), (Object)simpleCounter2.getLocalValue());
            try {
                DoubleCounter simpleCounter3 = this.getRuntimeContext().getDoubleCounter("simple-counter");
                Assert.fail((String)"Should not be able to obtain previously created counter with different type");
            }
            catch (UnsupportedOperationException ex) {
                // empty catch block
            }
            this.openCloseCounter.add(Double.valueOf(0.5));
        }

        public void map(Record record, Collector<Record> collector) {
            this.cntNumLines.add(Integer.valueOf(1));
            StringValue line = (StringValue)record.getField(0, StringValue.class);
            SimpleStringUtils.replaceNonWordChars((StringValue)line, (char)' ');
            SimpleStringUtils.toLowerCase((StringValue)line);
            this.tokenizer.setStringToTokenize(line);
            int wordsPerLine = 0;
            this.word = new StringValue();
            while (this.tokenizer.next(this.word)) {
                this.distinctWords.add(new StringRecord(this.word.getValue()));
                this.outputRecord.setField(0, (Value)this.word);
                this.outputRecord.setField(1, (Value)this.one);
                collector.collect((Object)this.outputRecord);
                ++wordsPerLine;
            }
            this.wordsPerLineDistribution.add(Integer.valueOf(wordsPerLine));
        }

        public void close() throws Exception {
            this.openCloseCounter.add(Double.valueOf(0.5));
            Assert.assertEquals((long)1L, (long)this.openCloseCounter.getLocalValue().intValue());
        }
    }
}

