/*
 * Decompiled with CFR 0.152.
 */
package hdfs.mapreduce;

import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.hdfs.internal.PersistedEventImpl;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred.GFInputFormat;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapred.GFOutputFormat;
import com.gemstone.gemfire.cache.hdfs.internal.hoplog.mapreduce.GFKey;
import hdfs.mapreduce.PEIWritable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import util.TestHelper;
import util.ValueHolder;

public class GetAllHDFSEventsForKey
extends Configured
implements Tool {
    public int run(String[] args) throws Exception {
        String locatorHost = args[0];
        int locatorPort = Integer.parseInt(args[1]);
        String hdfsHomeDir = args[2];
        StringBuffer searchKeys = new StringBuffer();
        for (int i = 3; i < args.length; ++i) {
            searchKeys.append(args[i] + " ");
        }
        System.out.println("GetAllHDFSEventsForKey invoked with args (locatorHost = " + locatorHost + " locatorPort = " + locatorPort + " hdfsHomeDir = " + hdfsHomeDir + " searchKeys = " + searchKeys);
        Configuration conf = this.getConf();
        JobConf jobConf = new JobConf(conf, GetAllHDFSEventsForKey.class);
        jobConf.setJobName("getAllHDFSEventsForKey");
        jobConf.set("getAllHDFSEventsForKey.searchKeys", searchKeys.toString());
        jobConf.set("mapreduce.input.gfinputformat.inputregion", "partitionedRegion");
        jobConf.set("mapreduce.input.gfinputformat.homedir", hdfsHomeDir);
        jobConf.setBoolean("mapreduce.input.gfinputformat.checkpoint", false);
        jobConf.set("mapreduce.output.gfoutputformat.outputregion", "hdfsResultRegion");
        jobConf.set("mapreduce.output.gfoutputformat.locatorhost", locatorHost);
        jobConf.setInt("mapreduce.output.gfoutputformat.locatorport", locatorPort);
        jobConf.setMapperClass(GetAllHDFSEventsForKeyMapper.class);
        jobConf.setInputFormat(GFInputFormat.class);
        jobConf.setMapOutputKeyClass(GFKey.class);
        jobConf.setMapOutputValueClass(PEIWritable.class);
        jobConf.setReducerClass(GetAllHDFSEventsForKeyReducer.class);
        jobConf.setOutputFormat(GFOutputFormat.class);
        JobClient.runJob((JobConf)jobConf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        System.out.println("GetAllHDFSEventsForKey.main() invoked with " + args);
        int rc = ToolRunner.run((Configuration)new Configuration(), (Tool)new GetAllHDFSEventsForKey(), (String[])args);
        System.exit(rc);
    }

    public static class GetAllHDFSEventsForKeyReducer
    extends MapReduceBase
    implements Reducer<GFKey, PEIWritable, Object, Object> {
        public void reduce(GFKey key, Iterator<PEIWritable> values, OutputCollector<Object, Object> output, Reporter reporter) throws IOException {
            String keyStr = (String)key.getKey();
            System.out.println("GetAllHDFSEventsForKey.reduce() invoked with " + keyStr);
            int eventCounter = 0;
            while (values.hasNext()) {
                ++eventCounter;
                PEIWritable peiWritable = values.next();
                PersistedEventImpl event = peiWritable.getEvent();
                Operation op = event.getOperation();
                StringBuffer newKey = new StringBuffer();
                newKey.append(keyStr + "_");
                newKey.append(eventCounter + "+");
                newKey.append(op.toString());
                System.out.println("GetAllHDFSEventsForKey.reduce() record: " + op.toString() + ": key = " + keyStr + " and op " + op.toString());
                Object o = null;
                if (!op.isDestroy()) {
                    try {
                        o = event.getDeserializedValue();
                    }
                    catch (ClassNotFoundException e) {
                        System.out.println("GetAllHDFSEventsForKey.reduce() caught " + e + " : " + TestHelper.getStackTrace(e));
                    }
                } else {
                    o = "DESTROYED";
                }
                output.collect((Object)newKey.toString(), o);
            }
        }
    }

    public static class GetAllHDFSEventsForKeyMapper
    extends MapReduceBase
    implements Mapper<GFKey, PersistedEventImpl, GFKey, PEIWritable> {
        private String searchKeys = null;

        public void configure(JobConf job) {
            this.searchKeys = job.get("getAllHDFSEventsForKey.searchKeys");
        }

        public void map(GFKey key, PersistedEventImpl value, OutputCollector<GFKey, PEIWritable> output, Reporter reporter) throws IOException {
            ArrayList<String> problemKeys = new ArrayList<String>();
            StringTokenizer tokenizer = new StringTokenizer(this.searchKeys);
            while (tokenizer.hasMoreTokens()) {
                problemKeys.add(tokenizer.nextToken());
            }
            StringBuffer aStr = new StringBuffer();
            Iterator it = problemKeys.iterator();
            while (it.hasNext()) {
                aStr.append(it.next() + " ");
            }
            System.out.println("GetAllHDFSEventsForKeyMapper.problemKeys = " + aStr.toString());
            String keyStr = (String)key.getKey();
            Operation op = value.getOperation();
            ValueHolder entryValue = null;
            if (problemKeys.contains(keyStr)) {
                System.out.println("map method invoked with " + keyStr + " " + op.toString());
            }
            try {
                entryValue = (ValueHolder)value.getDeserializedValue();
            }
            catch (ClassNotFoundException e) {
                System.out.println("GetAllHDFSEventsForKey.map() caught " + e + " : " + TestHelper.getStackTrace(e));
            }
            if (problemKeys.contains(keyStr)) {
                output.collect((Object)key, (Object)new PEIWritable(value));
            }
        }
    }
}

