/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.hadoopcompatibility.example;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.Program;
import eu.stratosphere.api.common.ProgramDescription;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.java.record.functions.FunctionAnnotation;
import eu.stratosphere.api.java.record.functions.MapFunction;
import eu.stratosphere.api.java.record.functions.ReduceFunction;
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.hadoopcompatibility.HadoopDataSink;
import eu.stratosphere.hadoopcompatibility.HadoopDataSource;
import eu.stratosphere.types.IntValue;
import eu.stratosphere.types.Record;
import eu.stratosphere.types.StringValue;
import eu.stratosphere.types.Value;
import eu.stratosphere.util.Collector;
import java.io.Serializable;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCountWithHadoopOutputFormat
implements Program,
ProgramDescription {
    public Plan getPlan(String ... args) {
        int numSubTasks = args.length > 0 ? Integer.parseInt(args[0]) : 1;
        String dataInput = args.length > 1 ? args[1] : "";
        String output = args.length > 2 ? args[2] : "";
        HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines");
        TextInputFormat.addInputPath((JobConf)source.getJobConf(), (Path)new Path(dataInput));
        MapOperator mapper = MapOperator.builder((MapFunction)new TokenizeLine()).input(source).name("Tokenize Lines").build();
        ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, (int)0).input((Operator)mapper).name("Count Words").build();
        HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>((OutputFormat<Text, IntWritable>)new TextOutputFormat(), new JobConf(), "Hadoop TextOutputFormat", (Operator<Record>)reducer, Text.class, IntWritable.class);
        TextOutputFormat.setOutputPath((JobConf)out.getJobConf(), (Path)new Path(output));
        Plan plan = new Plan(out, "Hadoop OutputFormat Example");
        plan.setDefaultParallelism(numSubTasks);
        return plan;
    }

    public String getDescription() {
        return "Parameters: [numSubStasks] [input] [output]";
    }

    public static void main(String[] args) throws Exception {
        WordCountWithHadoopOutputFormat wc = new WordCountWithHadoopOutputFormat();
        if (args.length < 3) {
            System.err.println(wc.getDescription());
            System.exit(1);
        }
        Plan plan = wc.getPlan(args);
        LocalExecutor.execute((Plan)plan);
    }

    @ReduceOperator.Combinable
    @FunctionAnnotation.ConstantFields(value={0})
    public static class CountWords
    extends ReduceFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception {
            Record element = null;
            int sum = 0;
            while (records.hasNext()) {
                element = records.next();
                int cnt = ((IntValue)element.getField(1, IntValue.class)).getValue();
                sum += cnt;
            }
            element.setField(1, (Value)new IntValue(sum));
            out.collect((Object)element);
        }

        public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
            this.reduce(records, out);
        }
    }

    public static class TokenizeLine
    extends MapFunction
    implements Serializable {
        private static final long serialVersionUID = 1L;

        public void map(Record record, Collector<Record> collector) {
            String line = ((StringValue)record.getField(1, StringValue.class)).getValue();
            line = line.replaceAll("\\W+", " ").toLowerCase();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                String word = tokenizer.nextToken();
                collector.collect((Object)new Record((Value)new StringValue((CharSequence)word), (Value)new IntValue(1)));
            }
        }
    }
}

