/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.MapReduceFlow;
import cascading.flow.hadoop.planner.HadoopPlanner;
import cascading.pipe.Pipe;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;

public class MapReduceFlowPlatformTest
extends PlatformTestCase {
    public MapReduceFlowPlatformTest() {
        super(true);
    }

    @Test
    public void testFlow() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        JobConf defaultConf = (JobConf)((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        JobConf conf = new JobConf((Configuration)defaultConf);
        conf.setJobName("mrflow");
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setMapperClass(IdentityMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path(InputData.inputFileApache)});
        String outputPath = this.getOutputPath("flowTest");
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(outputPath));
        MapReduceFlow flow = new MapReduceFlow("mrflow", conf, true);
        MapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), InputData.inputFileApache).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        flow.complete();
        MapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), outputPath).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
    }

    private String remove(String path, boolean delete) throws IOException {
        FileSystem fs = FileSystem.get((URI)URI.create(path), (Configuration)HadoopPlanner.createJobConf((Map)this.getProperties()));
        if (delete) {
            fs.delete(new Path(path), true);
        }
        return path;
    }

    @Test
    public void testCascade() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.remove(InputData.inputFileApache, false));
        String sinkPath4 = this.getOutputPath("flow4");
        Hfs sink1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.remove(sinkPath4, true), SinkMode.REPLACE);
        Flow firstFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)source1, (Tap)sink1, new Pipe("first-flow"));
        String sinkPath5 = this.getOutputPath("flow5");
        Hfs sink2 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), this.remove(sinkPath5, true), SinkMode.REPLACE);
        Flow secondFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect((Tap)sink1, (Tap)sink2, new Pipe("second-flow"));
        JobConf defaultConf = HadoopPlanner.createJobConf((Map)this.getProperties());
        JobConf firstConf = new JobConf((Configuration)defaultConf);
        firstConf.setJobName("first-mr");
        firstConf.setOutputKeyClass(LongWritable.class);
        firstConf.setOutputValueClass(Text.class);
        firstConf.setMapperClass(IdentityMapper.class);
        firstConf.setReducerClass(IdentityReducer.class);
        firstConf.setInputFormat(TextInputFormat.class);
        firstConf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths((JobConf)firstConf, (Path[])new Path[]{new Path(this.remove(sinkPath5, true))});
        String sinkPath1 = this.getOutputPath("flow1");
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)firstConf, (Path)new Path(this.remove(sinkPath1, true)));
        MapReduceFlow firstMR = new MapReduceFlow(firstConf, true);
        JobConf secondConf = new JobConf((Configuration)defaultConf);
        secondConf.setJobName("second-mr");
        secondConf.setOutputKeyClass(LongWritable.class);
        secondConf.setOutputValueClass(Text.class);
        secondConf.setMapperClass(IdentityMapper.class);
        secondConf.setReducerClass(IdentityReducer.class);
        secondConf.setInputFormat(TextInputFormat.class);
        secondConf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths((JobConf)secondConf, (Path[])new Path[]{new Path(this.remove(sinkPath1, true))});
        String sinkPath2 = this.getOutputPath("flow2");
        org.apache.hadoop.mapred.FileOutputFormat.setOutputPath((JobConf)secondConf, (Path)new Path(this.remove(sinkPath2, true)));
        MapReduceFlow secondMR = new MapReduceFlow(secondConf, true);
        Job job = new Job((Configuration)defaultConf);
        job.setJobName("third-mr");
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(Mapper.class);
        job.setReducerClass(Reducer.class);
        job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
        job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class);
        job.getConfiguration().set("mapred.mapper.new-api", "true");
        job.getConfiguration().set("mapred.reducer.new-api", "true");
        org.apache.hadoop.mapreduce.lib.input.FileInputFormat.addInputPath((Job)job, (Path)new Path(this.remove(sinkPath2, true)));
        String sinkPath3 = this.getOutputPath("flow3");
        FileOutputFormat.setOutputPath((Job)job, (Path)new Path(this.remove(sinkPath3, true)));
        MapReduceFlow thirdMR = new MapReduceFlow(new JobConf(job.getConfiguration()), true);
        CascadeConnector cascadeConnector = new CascadeConnector(this.getProperties());
        Cascade cascade = cascadeConnector.connect(new Flow[]{firstFlow, secondFlow, thirdMR, firstMR, secondMR});
        cascade.complete();
        MapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), sinkPath3).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
    }
}

