/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.MultiMapReduceFlow;
import cascading.flow.hadoop.planner.HadoopPlanner;
import cascading.platform.hadoop.BaseHadoopPlatform;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import data.InputData;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.junit.Test;

public class MultiMapReduceFlowPlatformTest
extends PlatformTestCase {
    public MultiMapReduceFlowPlatformTest() {
        super(true);
    }

    @Test
    public void testFlow() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath1 = this.getOutputPath("flowTest1");
        String outputPath2 = this.getOutputPath("flowTest2");
        String outputPath3 = this.getOutputPath("flowTest3");
        this.remove(outputPath1, true);
        this.remove(outputPath2, true);
        this.remove(outputPath3, true);
        JobConf defaultConf = (JobConf)((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        JobConf conf1 = this.createJob(defaultConf, "mr1", InputData.inputFileApache, outputPath1);
        JobConf conf2 = this.createJob(defaultConf, "mr2", outputPath1, outputPath2);
        JobConf conf3 = this.createJob(defaultConf, "mr3", outputPath2, outputPath3);
        MultiMapReduceFlow flow = new MultiMapReduceFlow("mrflow", conf1, new JobConf[]{conf2, conf3});
        MultiMapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), InputData.inputFileApache).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        flow.complete();
        MultiMapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), outputPath1).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        Collection sinks = flow.getSinks().values();
        MultiMapReduceFlowPlatformTest.assertEquals((int)1, (int)sinks.size());
        String identifier = ((Tap)sinks.iterator().next()).getIdentifier();
        MultiMapReduceFlowPlatformTest.assertEquals((String)"flowTest3", (String)identifier.substring(identifier.lastIndexOf(47) + 1));
    }

    @Test
    public void testFlowLazy() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath1 = this.getOutputPath("flowTest1");
        String outputPath2 = this.getOutputPath("flowTest2");
        String outputPath3 = this.getOutputPath("flowTest3");
        this.remove(outputPath1, true);
        this.remove(outputPath2, true);
        this.remove(outputPath3, true);
        JobConf defaultConf = (JobConf)((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        JobConf conf1 = this.createJob(defaultConf, "mr1", InputData.inputFileApache, outputPath1);
        JobConf conf2 = this.createJob(defaultConf, "mr2", outputPath1, outputPath2);
        JobConf conf3 = this.createJob(defaultConf, "mr3", outputPath2, outputPath3);
        MultiMapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), InputData.inputFileApache).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        MultiMapReduceFlow flow = new MultiMapReduceFlow("mrflow", conf1, new JobConf[0]);
        flow.start();
        Util.safeSleep((long)3000L);
        flow.attachFlowStep(conf2);
        Util.safeSleep((long)3000L);
        flow.attachFlowStep(conf3);
        flow.complete();
        MultiMapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), outputPath1).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        Collection sinks = flow.getSinks().values();
        MultiMapReduceFlowPlatformTest.assertEquals((int)1, (int)sinks.size());
        String identifier = ((Tap)sinks.iterator().next()).getIdentifier();
        MultiMapReduceFlowPlatformTest.assertEquals((String)"flowTest3", (String)identifier.substring(identifier.lastIndexOf(47) + 1));
    }

    @Test(expected=IllegalStateException.class)
    public void testFlowLazyFail() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        String outputPath1 = this.getOutputPath("flowTest1");
        String outputPath2 = this.getOutputPath("flowTest2");
        this.remove(outputPath1, true);
        this.remove(outputPath2, true);
        JobConf defaultConf = (JobConf)((BaseHadoopPlatform)this.getPlatform()).getConfiguration();
        JobConf conf1 = this.createJob(defaultConf, "mr1", InputData.inputFileApache, outputPath1);
        JobConf conf2 = this.createJob(defaultConf, "mr2", outputPath1, outputPath2);
        MultiMapReduceFlowPlatformTest.validateLength((TupleEntryIterator)new Hfs((Scheme)new TextLine(), InputData.inputFileApache).openForRead((FlowProcess)new HadoopFlowProcess(defaultConf)), (int)10);
        MultiMapReduceFlow flow = new MultiMapReduceFlow("mrflow", conf1, new JobConf[0]);
        flow.complete();
        flow.attachFlowStep(conf2);
    }

    protected JobConf createJob(JobConf defaultConf, String name, String inputPath, String outputPath) {
        JobConf conf = new JobConf((Configuration)defaultConf);
        conf.setJobName(name);
        conf.setOutputKeyClass(LongWritable.class);
        conf.setOutputValueClass(Text.class);
        conf.setMapperClass(IdentityMapper.class);
        conf.setReducerClass(IdentityReducer.class);
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        FileInputFormat.setInputPaths((JobConf)conf, (Path[])new Path[]{new Path(inputPath)});
        FileOutputFormat.setOutputPath((JobConf)conf, (Path)new Path(outputPath));
        return conf;
    }

    private String remove(String path, boolean delete) throws IOException {
        FileSystem fs = FileSystem.get((URI)URI.create(path), (Configuration)HadoopPlanner.createJobConf((Map)this.getProperties()));
        if (delete) {
            fs.delete(new Path(path), true);
        }
        return path;
    }
}

