/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.arraymodel.example;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.io.FileInputFormat;
import eu.stratosphere.api.common.io.FileOutputFormat;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.MapOperatorBase;
import eu.stratosphere.api.common.operators.base.ReduceOperatorBase;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.arraymodel.ArrayModelPlan;
import eu.stratosphere.arraymodel.functions.DataTypes;
import eu.stratosphere.arraymodel.functions.MapFunction;
import eu.stratosphere.arraymodel.functions.ReduceFunction;
import eu.stratosphere.arraymodel.io.ArrayOutputFormat;
import eu.stratosphere.arraymodel.io.StringInputFormat;
import eu.stratosphere.arraymodel.io.StringIntOutputFormat;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.AsciiUtils;
import eu.stratosphere.util.Collector;
import java.util.Iterator;

public class WordCountArrayTuples
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        FileDataSource source = new FileDataSource((FileInputFormat)new StringInputFormat(), dataInput, "Input Lines");
        source.setParameter("textformat.charset", "ASCII");
        MapOperatorBase mapper = new MapOperatorBase(TokenizeLine.class, "Tokenize Lines");
        mapper.setInput(new Operator[]{source});
        ReduceOperatorBase reducer = new ReduceOperatorBase(CountWords.class, new int[]{0}, "Count Words");
        reducer.setInput(new Operator[]{mapper});
        FileDataSink out = new FileDataSink((FileOutputFormat)new StringIntOutputFormat(), output, (Operator)reducer, "Word Counts");
        ((ArrayOutputFormat.ConfigBuilder)((Object)((ArrayOutputFormat.ConfigBuilder)((Object)StringIntOutputFormat.configureArrayFormat(out).recordDelimiter('\n'))).fieldDelimiter(' '))).lenient(true);
        ArrayModelPlan plan = new ArrayModelPlan((GenericDataSink)out, "WordCount Example");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [input] [output]";
    }

    public static void main(String[] args) throws Exception {
        WordCountArrayTuples wc = new WordCountArrayTuples();
        if (args.length < 3) {
            System.err.println(wc.getDescription());
            System.exit(1);
        }
        Plan plan = wc.getPlan(args);
        LocalExecutor.execute((Plan)plan);
    }

    @ReduceOperatorBase.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static class CountWords
    extends ReduceFunction {
        private final IntValue cnt = new IntValue();
        private final Value[] result = new Value[]{null, this.cnt};

        @Override
        @DataTypes(value={StringValue.class, IntValue.class})
        public void reduce(Iterator<Value[]> records, Collector<Value[]> out) throws Exception {
            Value[] element = null;
            int sum = 0;
            while (records.hasNext()) {
                element = records.next();
                sum += ((IntValue)element[1]).getValue();
            }
            this.cnt.setValue(sum);
            this.result[0] = element[0];
            out.collect((Object)this.result);
        }
    }

    public static class TokenizeLine
    extends MapFunction {
        private final StringValue word = new StringValue();
        private final IntValue one = new IntValue(1);
        private final Value[] outputRecord = new Value[]{this.word, this.one};
        private final AsciiUtils.WhitespaceTokenizer tokenizer = new AsciiUtils.WhitespaceTokenizer();

        @Override
        @DataTypes(value={StringValue.class})
        public void map(Value[] record, Collector<Value[]> collector) {
            StringValue line = (StringValue)record[0];
            AsciiUtils.replaceNonWordChars((StringValue)line, (char)' ');
            AsciiUtils.toLowerCase((StringValue)line);
            this.tokenizer.setStringToTokenize(line);
            while (this.tokenizer.next(this.word)) {
                collector.collect((Object)this.outputRecord);
            }
        }
    }
}

