/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop;

import cascading.PlatformTestCase;
import cascading.TestBuffer;
import cascading.TestFunction;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowConnectorProps;
import cascading.flow.FlowElement;
import cascading.flow.FlowStep;
import cascading.flow.hadoop.HadoopFlow;
import cascading.flow.hadoop.HadoopFlowStep;
import cascading.flow.planner.BaseFlowStep;
import cascading.flow.planner.PlannerException;
import cascading.flow.planner.Scope;
import cascading.flow.planner.graph.ElementGraph;
import cascading.flow.planner.graph.ElementGraphs;
import cascading.operation.Aggregator;
import cascading.operation.Assertion;
import cascading.operation.AssertionLevel;
import cascading.operation.Buffer;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.aggregator.MaxValue;
import cascading.operation.aggregator.Sum;
import cascading.operation.assertion.AssertNotNull;
import cascading.operation.assertion.AssertNull;
import cascading.operation.expression.ExpressionFilter;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.util.TempHfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Ignore;
import org.junit.Test;

public class BuildJobsHadoopPlatformTest
extends PlatformTestCase {
    public BuildJobsHadoopPlatformTest() {
        super(false);
    }

    @Test
    public void testIdentity() throws Exception {
        Hfs source = new Hfs((Scheme)new TextLine(), "input/path");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output/path", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        HadoopFlowStep step = (HadoopFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)1, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNull((String)"not null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
    }

    @Test
    public void testName() {
        Pipe count = new Pipe("count");
        GroupBy pipe = new GroupBy(count, new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new Every((Pipe)pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: count.getName()", (String)"count", (String)count.getName());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: pipe.getName()", (String)"count", (String)pipe.getName());
        pipe = new Each(count, new Fields(new Comparable[]{Integer.valueOf(1)}), (Function)new RegexSplitter(Fields.size((int)2)));
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: pipe.getName()", (String)"count", (String)pipe.getName());
    }

    @Test
    public void testOneJob() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)1, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)step.getSourceTaps().iterator().next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)2, (int)reduceDist);
    }

    @Test
    public void testOneJob2() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Function)new Identity(), new Fields(new Comparable[]{Integer.valueOf(2)}));
        pipe = new Each(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}), (Function)new Identity(new Fields(new Comparable[]{"_all"})), new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(0)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)1, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)step.getSourceTaps().iterator().next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)3, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)2, (int)reduceDist);
    }

    @Test
    public void testOneJob3() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("a", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        sources.put("b", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        CoGroup splice = new CoGroup(pipeA, new Fields(new Comparable[]{Integer.valueOf(1)}), pipeB, new Fields(new Comparable[]{Integer.valueOf(1)}));
        sinks.put(splice.getName(), new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{splice}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)2, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        Iterator iterator = step.getSourceTaps().iterator();
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)iterator.next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)iterator.next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)1, (int)reduceDist);
    }

    @Test
    public void testOneJob4() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("a", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        sources.put("b", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        CoGroup cogroup = new CoGroup(pipeA, new Fields(new Comparable[]{Integer.valueOf(1)}), pipeB, new Fields(new Comparable[]{Integer.valueOf(1)}));
        cogroup = new Each((Pipe)cogroup, (Function)new Identity());
        sinks.put(cogroup.getName(), new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{cogroup}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)2, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)step.getSourceTaps().iterator().next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)2, (int)reduceDist);
    }

    @Test
    public void testOneJob5() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("a", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        sources.put("b", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"third", "fourth"})), "input/path/b"));
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        CoGroup splice = new CoGroup(new Pipe[]{pipeA, pipeB});
        splice = new Each((Pipe)splice, (Function)new Identity());
        sinks.put(splice.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{splice}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)2, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)step.getSourceTaps().iterator().next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)2, (int)reduceDist);
    }

    @Test
    public void testNoGroup() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new Each(pipe, (Function)new Identity());
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe});
            BuildJobsHadoopPlatformTest.fail((String)"did not throw flow exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testSplit() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)2, (int)steps.size());
    }

    @Test
    public void testSplitHangingTails() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        try {
            this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe});
            BuildJobsHadoopPlatformTest.fail((String)"did not catch missing tails");
        }
        catch (Exception exception) {
            System.out.println("exception.getMessage() = " + exception.getMessage());
            BuildJobsHadoopPlatformTest.assertTrue((boolean)exception.getMessage().contains("'left', 'right'"));
        }
    }

    @Test
    public void testSplitOnNonSafeOperations() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong number of operations", (int)2, (int)((BaseFlowStep)step).getAllOperations().size());
    }

    @Test
    public void testSplitOnNonSafeOperationsSimple() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"}));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong number of operations", (int)1, (int)((BaseFlowStep)step).getAllOperations().size());
    }

    @Test
    public void testSplitOnNonSafeOperations2() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Hfs sink3 = new Hfs((Scheme)new TextLine(), "foo/split3", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new TestFunction(new Fields(new Comparable[]{"ignore"}), new Tuple(new Object[]{1}), false), new Fields(new Comparable[]{"line"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter("^68.*"));
        pipe = new Pipe("middle", pipe);
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        sinks.put("middle", sink3);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)4, (int)steps.size());
        FlowStep step = (FlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong number of operations", (int)2, (int)((BaseFlowStep)step).getAllOperations().size());
    }

    @Test
    public void testSplitComplex() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, new Fields(new Comparable[]{"ip"}), (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(new Pipe("left", pipe), new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", pipe), new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter(".*192.*"));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        Scope nextScope = step.getNextScope((FlowElement)step.getGroup());
        FlowElement operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not an Every", (boolean)(operator instanceof Every));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not a Each", (boolean)(operator instanceof Each));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not a TempHfs", (boolean)(operator instanceof TempHfs));
    }

    @Test
    public void testSplitComplex2() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "foo/split1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "foo/split2", SinkMode.REPLACE);
        Pipe pipe = new Pipe("split");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, new Fields(new Comparable[]{"ip"}), (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter("^68.*"));
        Each left = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter(".*46.*"));
        left = new Pipe("left", (Pipe)left);
        Each right = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new RegexFilter(".*192.*"));
        right = new Pipe("right", (Pipe)right);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("split", source);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sink1);
        sinks.put("right", sink2);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        Scope nextScope = step.getNextScope((FlowElement)step.getGroup());
        FlowElement operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not an Every", (boolean)(operator instanceof Every));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not a Each", (boolean)(operator instanceof Each));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not a TempHfs", (boolean)(operator instanceof TempHfs));
    }

    @Test
    public void testMerge() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs source2 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Each left = new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        GroupBy merge = new GroupBy("merge", Pipe.pipes((Pipe[])new Pipe[]{left, right}), new Fields(new Comparable[]{"offset"}));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("right", source2);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("merge", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)1, (int)steps.size());
    }

    @Test
    public void testDupeSource() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs source2 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Each left = new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        right = new Each((Pipe)right, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        right = new Each((Pipe)right, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        right = new Each((Pipe)right, new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        GroupBy merge = new GroupBy("merge", Pipe.pipes((Pipe[])new Pipe[]{left, right}), new Fields(new Comparable[]{"offset"}));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("right", source2);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("merge", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)1, (int)steps.size());
    }

    @Test
    public void testDupeSourceRepeat() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Pipe pipe = new Pipe("pipe");
        CoGroup merge = new CoGroup("cogroup", pipe, new Fields(new Comparable[]{"offset"}), 1, Fields.size((int)4));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("pipe", source1);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("cogroup", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)1, (int)steps.size());
    }

    @Test
    public void testDupeSource2() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Pipe left = new Pipe("left");
        Pipe right = new Pipe("right");
        CoGroup merge = new CoGroup("cogroup", left, new Fields(new Comparable[]{"offset"}), right, new Fields(new Comparable[]{"offset"}), Fields.size((int)4));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("right", source1);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("cogroup", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
    }

    @Test
    public void testDupeSource3() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Hfs source2 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar/merge");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Pipe left = new Pipe("left");
        Pipe middle = new Pipe("middle");
        Pipe right = new Pipe("right");
        Pipe[] pipes = Pipe.pipes((Pipe[])new Pipe[]{left, middle, right});
        Fields[] fields = Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"})});
        CoGroup merge = new CoGroup("cogroup", pipes, fields, Fields.size((int)6));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("middle", source2);
        sources.put("right", source1);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("cogroup", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
    }

    @Test
    public void testMerge2() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs source2 = new Hfs((Scheme)new SequenceFile(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Each left = new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        GroupBy merge = new GroupBy("merge", Pipe.pipes((Pipe[])new Pipe[]{left, right}), new Fields(new Comparable[]{"offset"}));
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("right", source2);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("merge", sink);
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)1, (int)steps.size());
    }

    @Test
    public void testMergeSameSourceSplit() {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Pipe head = new Pipe("source");
        head = new Each(head, new Fields(new Comparable[]{"line"}), (Filter)new ExpressionFilter("line.length() != 0", String.class));
        Each left = new Each(new Pipe("left", head), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right", head), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        GroupBy merge = new GroupBy("merge", Pipe.pipes((Pipe[])new Pipe[]{left, right}), new Fields(new Comparable[]{"offset"}));
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, (Pipe)merge);
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)1, (int)steps.size());
    }

    @Test
    public void testCoGroupAroundCoGroup() throws Exception {
        Hfs source10 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs source20 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("source20", source20);
        sources.put("source101", source10);
        sources.put("source102", source10);
        Hfs sink = new Hfs((Scheme)new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipeNum20 = new Pipe("source20");
        Pipe pipeNum101 = new Pipe("source101");
        Pipe pipeNum102 = new Pipe("source102");
        CoGroup splice1 = new CoGroup(pipeNum20, new Fields(new Comparable[]{"num"}), pipeNum101, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), pipeNum102, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, (Tap)sink, (Pipe)splice2);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)2, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testCoGroupAroundCoGroupOptimized() throws Exception {
        Hfs source10 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs source20 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("source20", source20);
        sources.put("source101", source10);
        sources.put("source102", source10);
        Hfs sink = new Hfs((Scheme)new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipeNum20 = new Pipe("source20");
        Pipe pipeNum101 = new Pipe("source101");
        Pipe pipeNum102 = new Pipe("source102");
        CoGroup splice1 = new CoGroup(pipeNum20, new Fields(new Comparable[]{"num"}), pipeNum101, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), pipeNum102, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}));
        Properties properties = new Properties();
        FlowConnectorProps.setIntermediateSchemeClass((Map)properties, TextLine.class);
        FlowConnector flowConnector = this.getPlatform().getFlowConnector((Map)properties);
        Flow flow = flowConnector.connect(sources, (Tap)sink, (Pipe)splice2);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)2, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testCoGroupAroundCoGroupAroundCoGroup() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), "output", SinkMode.REPLACE);
        Each pipeLower = new Each("lower", new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper1 = new Each("upper1", new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper2 = new Each("upper2", new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice1 = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        splice1 = new Each((Pipe)splice1, (Function)new Identity());
        splice1 = new GroupBy((Pipe)splice1, new Fields(new Comparable[]{Integer.valueOf(0)}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), (Pipe)pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        splice2 = new Each((Pipe)splice2, (Function)new Identity());
        splice2 = new GroupBy((Pipe)splice2, new Fields(new Comparable[]{Integer.valueOf(0)}));
        splice2 = new CoGroup((Pipe)splice2, new Fields(new Comparable[]{"num1"}), (Pipe)splice1, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"}));
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, (Tap)sink, (Pipe)splice2);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)5, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testCoGroupWithResultGroupFieldsDefault() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), "/complex/cogroup/", SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "lhs", "num2", "rhs"}));
        splice = new Every((Pipe)splice, (Aggregator)new First(new Fields(new Comparable[]{"value"})), new Fields(new Comparable[]{"num1", "value"}));
        Flow countFlow = this.getPlatform().getFlowConnector().connect(sources, (Tap)sink, (Pipe)splice);
    }

    @Test
    public void testCoGroupWithResultGroupFields() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(), "/complex/cogroup/", SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "lhs", "num2", "rhs"}), new Fields(new Comparable[]{"somenum", "somenum2"}));
        splice = new Every((Pipe)splice, (Aggregator)new First(new Fields(new Comparable[]{"value"})), new Fields(new Comparable[]{"somenum", "value"}));
        Flow countFlow = this.getPlatform().getFlowConnector().connect(sources, (Tap)sink, (Pipe)splice);
    }

    @Test
    public void testDirectCoGroup() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        sources.put("lower2", sourceLower);
        sources.put("upper1", sourceUpper);
        sources.put("upper2", sourceUpper);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        Pipe pipeLower2 = new Pipe("lower2");
        Pipe pipeUpper1 = new Pipe("upper1");
        Pipe pipeUpper2 = new Pipe("upper2");
        CoGroup splice1 = new CoGroup(pipeLower1, new Fields(new Comparable[]{"num"}), pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), pipeUpper2, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        splice2 = new CoGroup("output1", (Pipe)splice2, new Fields(new Comparable[]{"num1"}), (Pipe)splice1, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"}));
        CoGroup splice3 = new CoGroup("output2", pipeLower2, new Fields(new Comparable[]{"num"}), (Pipe)splice2, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5", "num6", "char6"}));
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{splice3});
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)5, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testMultipleCoGroupSimilarSources() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        sources.put("upper1", sourceUpper);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        Pipe pipeUpper1 = new Pipe("upper1");
        CoGroup splice1 = new CoGroup(pipeLower1, new Fields(new Comparable[]{"num"}), pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num1"}), pipeUpper1, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}));
        splice2 = new CoGroup("output1", (Pipe)splice2, new Fields(new Comparable[]{"num1"}), (Pipe)splice1, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5"}));
        CoGroup splice3 = new CoGroup("output2", pipeUpper1, new Fields(new Comparable[]{"num"}), (Pipe)splice2, new Fields(new Comparable[]{"num1"}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3", "num4", "char4", "num5", "char5", "num6", "char6"}));
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{splice3});
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)5, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testMultipleCoGroupSplitSources() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        sources.put("upper1", sourceUpper);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        Pipe pipeUpper1 = new Pipe("upper1");
        Each pipeLower2 = new Each(pipeLower1, (Function)new Identity());
        pipeLower2 = new GroupBy((Pipe)pipeLower2, new Fields(new Comparable[]{"num", "char"}));
        pipeLower2 = new Every((Pipe)pipeLower2, new Fields(new Comparable[]{"num", "char"}), (Aggregator)new Count(), new Fields(new Comparable[]{"num", "char"}));
        pipeLower1 = new Each(pipeLower1, (Function)new Identity());
        pipeLower1 = new Each(pipeLower1, (Function)new Identity());
        pipeLower1 = new Each(pipeLower1, (Function)new Identity());
        pipeLower1 = new Pipe("lower2", pipeLower1);
        pipeUpper1 = new Each(pipeUpper1, (Function)new Identity());
        pipeUpper1 = new Each(pipeUpper1, (Function)new Identity());
        pipeUpper1 = new Each(pipeUpper1, (Function)new Identity());
        CoGroup splice1 = new CoGroup("group", Pipe.pipes((Pipe[])new Pipe[]{pipeLower1, pipeLower2, pipeUpper1}), Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num"})}), new Fields(new Comparable[]{"num1", "char1", "num2", "char2", "num3", "char3"}), (Joiner)new InnerJoin());
        Each output1 = new Each((Pipe)splice1, AssertionLevel.VALID, (Assertion)new AssertNotNull());
        output1 = new Each((Pipe)output1, (Function)new Identity());
        output1 = new Pipe("output1", (Pipe)output1);
        Each output2 = new Each((Pipe)splice1, AssertionLevel.VALID, (Assertion)new AssertNull());
        output2 = new Each((Pipe)output2, (Function)new Identity());
        output2 = new Pipe("output2", (Pipe)output2);
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{output1, output2});
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)4, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testSplitEachOnGroup() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        GroupBy pipe = new GroupBy(pipeLower1, new Fields(new Comparable[]{Integer.valueOf(0)}));
        Each left = new Each(new Pipe("output1", (Pipe)pipe), (Function)new Identity());
        Each right = new Each(new Pipe("output2", (Pipe)pipe), (Function)new Identity());
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, sinks, Pipe.pipes((Pipe[])new Pipe[]{left, right}));
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
    }

    @Test
    public void testSplitEveryOnGroup() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        GroupBy pipe = new GroupBy(pipeLower1, new Fields(new Comparable[]{Integer.valueOf(0)}));
        Every left = new Every(new Pipe("output1", (Pipe)pipe), (Buffer)new TestBuffer(new Fields(new Comparable[]{"left"}), (Comparable)Boolean.valueOf(true)));
        Every right = new Every(new Pipe("output2", (Pipe)pipe), (Buffer)new TestBuffer(new Fields(new Comparable[]{"right"}), (Comparable)Boolean.valueOf(true)));
        Flow flow = null;
        try {
            flow = this.getPlatform().getFlowConnector().connect(sources, sinks, Pipe.pipes((Pipe[])new Pipe[]{left, right}));
            BuildJobsHadoopPlatformTest.fail((String)"did not throw planner exception");
        }
        catch (PlannerException plannerException) {
            // empty catch block
        }
    }

    @Test
    public void testSplitOutput() throws Exception {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num", "char"})), "foo");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower1", sourceLower);
        Hfs sink1 = new Hfs((Scheme)new TextLine(), "output1", SinkMode.REPLACE);
        Hfs sink2 = new Hfs((Scheme)new TextLine(), "output2", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("output1", sink1);
        sinks.put("output2", sink2);
        Pipe pipeLower1 = new Pipe("lower1");
        GroupBy left = new GroupBy("output1", pipeLower1, new Fields(new Comparable[]{Integer.valueOf(0)}));
        GroupBy right = new GroupBy("output2", (Pipe)left, new Fields(new Comparable[]{Integer.valueOf(0)}));
        Flow flow = null;
        flow = this.getPlatform().getFlowConnector().connect(sources, sinks, Pipe.pipes((Pipe[])new Pipe[]{left, right}));
        List steps = flow.getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)steps.size());
    }

    @Test
    public void testSameSourceForBranch() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("a", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a"));
        Pipe pipeA = new Pipe("a");
        GroupBy group1 = new GroupBy("a1", pipeA, Fields.FIRST);
        GroupBy group2 = new GroupBy("a2", pipeA, Fields.FIRST);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        sinks.put(merge.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testSameTaps() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        sources.put("a", tap);
        sources.put("b", tap);
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        GroupBy group1 = new GroupBy(pipeA);
        GroupBy group2 = new GroupBy(pipeB);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        sinks.put(merge.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)3, (int)flow.getFlowSteps().size());
    }

    @Test
    public void testDanglingHead() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        sources.put("a", source);
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        GroupBy group1 = new GroupBy(pipeA);
        GroupBy group2 = new GroupBy(pipeB);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        sinks.put(merge.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
            BuildJobsHadoopPlatformTest.fail((String)"did not catch missing source tap");
        }
        catch (PlannerException flow) {
        }
        catch (Exception exception) {
            BuildJobsHadoopPlatformTest.fail((String)"threw wrong exception");
        }
    }

    @Test
    public void testDanglingTail() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap sinks = new HashMap();
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        sources.put("a", tap);
        sources.put("b", tap);
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        GroupBy group1 = new GroupBy(pipeA);
        GroupBy group2 = new GroupBy(pipeB);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
            BuildJobsHadoopPlatformTest.fail((String)"did not catch missing sink tap");
        }
        catch (PlannerException flow) {
        }
        catch (Exception exception) {
            BuildJobsHadoopPlatformTest.fail((String)"threw wrong exception");
        }
    }

    @Test
    public void testExtraSource() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        sources.put("a", tap);
        sources.put("b", tap);
        sources.put("c", tap);
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        GroupBy group1 = new GroupBy(pipeA);
        GroupBy group2 = new GroupBy(pipeB);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        sinks.put(merge.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
            BuildJobsHadoopPlatformTest.fail((String)"did not catch extra source tap");
        }
        catch (PlannerException exception) {
            BuildJobsHadoopPlatformTest.assertTrue((boolean)exception.getMessage().contains("['c']"));
        }
        catch (Exception exception) {
            BuildJobsHadoopPlatformTest.fail((String)"threw wrong exception");
        }
    }

    @Test
    public void testExtraSink() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path/a");
        sources.put("a", tap);
        sources.put("b", tap);
        Pipe pipeA = new Pipe("a");
        Pipe pipeB = new Pipe("b");
        GroupBy group1 = new GroupBy(pipeA);
        GroupBy group2 = new GroupBy(pipeB);
        GroupBy merge = new GroupBy("tail", Pipe.pipes((Pipe[])new Pipe[]{group1, group2}), new Fields(new Comparable[]{"first", "second"}));
        sinks.put(merge.getName(), new Hfs((Scheme)new TextLine(), "output/path"));
        sinks.put("c", new Hfs((Scheme)new TextLine(), "output/path"));
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
            BuildJobsHadoopPlatformTest.fail((String)"did not catch extra sink tap");
        }
        catch (PlannerException exception) {
            BuildJobsHadoopPlatformTest.assertTrue((boolean)exception.getMessage().contains("['c']"));
        }
        catch (Exception exception) {
            BuildJobsHadoopPlatformTest.fail((String)"threw wrong exception");
        }
    }

    @Test
    public void testBuffer() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Buffer)new TestBuffer(new Fields(new Comparable[]{"fourth"}), (Comparable)((Object)"value")), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        List steps = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe}).getFlowSteps();
        BuildJobsHadoopPlatformTest.assertEquals((String)"wrong size", (int)1, (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: step.sources.size()", (int)1, (int)step.getSourceTaps().size());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.groupBy", (Object)step.getGroup());
        BuildJobsHadoopPlatformTest.assertNotNull((String)"null: step.sink", (Object)step.getSink());
        int mapDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)((FlowElement)step.getSourceTaps().iterator().next()), (FlowElement)step.getGroup());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: mapDist", (int)1, (int)mapDist);
        int reduceDist = ElementGraphs.shortestDistance((ElementGraph)step.getElementGraph(), (FlowElement)step.getGroup(), (FlowElement)step.getSink());
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: reduceDist", (int)2, (int)reduceDist);
    }

    @Test
    public void testBufferFail() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Buffer)new TestBuffer(new Fields(new Comparable[]{"fourth"}), (Comparable)((Object)"value")), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        try {
            this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe});
            BuildJobsHadoopPlatformTest.fail((String)"did not throw planner exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testBufferFail2() throws IOException {
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sources.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"first", "second"})), "input/path"));
        sinks.put("count", new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)})), "output/path"));
        Pipe pipe = new Pipe("count");
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Aggregator)new Count(), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        pipe = new Every(pipe, new Fields(new Comparable[]{Integer.valueOf(1)}), (Buffer)new TestBuffer(new Fields(new Comparable[]{"fourth"}), (Comparable)((Object)"value")), new Fields(new Comparable[]{Integer.valueOf(0), Integer.valueOf(1)}));
        try {
            this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{pipe});
            BuildJobsHadoopPlatformTest.fail((String)"did not throw planner exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testErrorMessages() throws Exception {
        Hfs source10 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "foo");
        Hfs source20 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"num"})), "bar");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("source20", source20);
        sources.put("source101", source10);
        sources.put("source102", source10);
        Hfs sink = new Hfs((Scheme)new TextLine(), "baz", SinkMode.REPLACE);
        Pipe pipeNum20 = new Pipe("source20");
        Pipe pipeNum101 = new Pipe("source101");
        Pipe pipeNum102 = new Pipe("source102");
        CoGroup splice1 = new CoGroup(pipeNum20, new Fields(new Comparable[]{"num"}), pipeNum101, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2"}));
        CoGroup splice2 = new CoGroup((Pipe)splice1, new Fields(new Comparable[]{"num9"}), pipeNum102, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num1", "num2", "num3"}));
        FlowConnector flowConnector = this.getPlatform().getFlowConnector();
        try {
            Flow flow = flowConnector.connect(sources, (Tap)sink, (Pipe)splice2);
            BuildJobsHadoopPlatformTest.fail((String)"did not fail on bad field");
        }
        catch (Exception exception) {
            BuildJobsHadoopPlatformTest.assertTrue((String)"missing message", (boolean)exception.getMessage().contains("BuildJobsHadoopPlatformTest.testErrorMessages"));
        }
    }

    @Test
    public void testSplitInMiddleBeforePipeOptimized() throws Exception {
        this.splitMiddle(true, true);
    }

    @Test
    public void testSplitInMiddleBeforePipe() throws Exception {
        this.splitMiddle(true, false);
    }

    @Test
    public void testSplitInMiddleAfterPipe() throws Exception {
        this.splitMiddle(false, false);
    }

    private void splitMiddle(boolean before, boolean testTempReplaced) {
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "lower");
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "upper");
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        SequenceFile leftScheme = testTempReplaced ? new SequenceFile(new Fields(new Comparable[]{"num", "lower", "num2", "upper"})) : new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"lower"}));
        Hfs sinkLeft = new Hfs((Scheme)leftScheme, "/splitmiddle/left", SinkMode.REPLACE);
        SequenceFile rightScheme = testTempReplaced ? new SequenceFile(new Fields(new Comparable[]{"lower"})) : new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"lower"}));
        Hfs sinkRight = new Hfs((Scheme)rightScheme, "/splitmiddle/right", SinkMode.REPLACE);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("left", sinkLeft);
        sinks.put("right", sinkRight);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        CoGroup splice = new CoGroup("both", (Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), new Fields(new Comparable[]{"num", "lower", "num2", "upper"}));
        CoGroup left = splice = new Each((Pipe)splice, new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter(".*"));
        if (before) {
            left = new Pipe("left", (Pipe)left);
        }
        left = new Each((Pipe)left, new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter(".*"));
        if (!before) {
            left = new Pipe("left", (Pipe)left);
        }
        CoGroup right = left;
        if (before) {
            right = new Pipe("right", (Pipe)right);
        }
        right = new Each((Pipe)right, new Fields(new Comparable[]{"num"}), (Filter)new RegexFilter(".*"));
        if (!before) {
            right = new Pipe("right", (Pipe)right);
        }
        FlowConnector flowConnector = this.getPlatform().getFlowConnector();
        Flow flow = flowConnector.connect("splitmiddle", sources, sinks, new Pipe[]{left, right});
        List steps = flow.getFlowSteps();
        if (testTempReplaced && flowConnector.getRuleRegistrySet().findRegistryWith("CombineAdjacentTapTransformer") == null) {
            testTempReplaced = false;
        }
        BuildJobsHadoopPlatformTest.assertEquals((String)"not equal: steps.size()", (int)(testTempReplaced ? 2 : 3), (int)steps.size());
        BaseFlowStep step = (BaseFlowStep)steps.get(0);
        Scope nextScope = step.getNextScope((FlowElement)step.getGroup());
        FlowElement operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not an Each", (boolean)(operator instanceof Each));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        BuildJobsHadoopPlatformTest.assertTrue((String)"not a Each", (boolean)(operator instanceof Each));
        nextScope = step.getNextScope(operator);
        operator = step.getNextFlowElement(nextScope);
        if (testTempReplaced) {
            BuildJobsHadoopPlatformTest.assertEquals((String)"not proper sink", (Object)sinkLeft, (Object)operator);
        } else {
            BuildJobsHadoopPlatformTest.assertTrue((String)"not a TempHfs", (boolean)(operator instanceof TempHfs));
        }
    }

    @Test
    public void testSourceIsSink() {
        Hfs tap = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge");
        Pipe pipe = new Pipe("left");
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect((Tap)tap, (Tap)tap, pipe);
            BuildJobsHadoopPlatformTest.fail((String)"did not throw planner exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testReplaceFail() throws Exception {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo");
        Hfs sink = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"}), new Fields(new Comparable[]{"offset", "line2"})), "bar", SinkMode.REPLACE);
        Pipe pipe = new Pipe("test");
        RegexParser parser = new RegexParser(new Fields(new Comparable[]{Integer.valueOf(0)}), "^[^ ]*");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)parser, Fields.REPLACE);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(Fields.ARGS), Fields.REPLACE);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(new Fields(new Comparable[]{"line2"})), Fields.REPLACE);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
            BuildJobsHadoopPlatformTest.fail((String)"did not fail");
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testNestedProperties() throws IOException {
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"line"})), "/input");
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, (Function)new RegexSplitter(new Fields(new Comparable[]{"first", "second", "third"}), "\\s"), Fields.ALL);
        Hfs sink = new Hfs((Scheme)new TextLine(), "output", SinkMode.REPLACE);
        Properties defaultProperties = new Properties();
        defaultProperties.setProperty("test.key", "test.value");
        HadoopFlow flow = (HadoopFlow)this.getPlatform().getFlowConnector((Map)new Properties(defaultProperties)).connect((Tap)source, (Tap)sink, pipe);
        BuildJobsHadoopPlatformTest.assertEquals((String)"test flow", (String)"test.value", (String)flow.getProperty("test.key"));
        BuildJobsHadoopPlatformTest.assertEquals((String)"test step", (String)"test.value", (String)((HadoopFlowStep)flow.getFlowSteps().get(0)).createInitializedConfig(flow.getFlowProcess(), flow.getConfig()).get("test.key"));
    }

    @Test
    public void testEveryAfterJoin() {
        Hfs source1 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge1");
        Hfs source2 = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), "foo/merge2");
        Hfs sink = new Hfs((Scheme)new TextLine(), "foo");
        Each left = new Each(new Pipe("left"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*46.*"));
        Each right = new Each(new Pipe("right"), new Fields(new Comparable[]{"line"}), (Filter)new RegexFilter(".*192.*"));
        Fields[] fields = Fields.fields((Fields[])new Fields[]{new Fields(new Comparable[]{"offset"}), new Fields(new Comparable[]{"offset"})});
        HashJoin merge = new HashJoin("join", Pipe.pipes((Pipe[])new Pipe[]{left, right}), fields, Fields.size((int)4), (Joiner)new InnerJoin());
        merge = new Every((Pipe)merge, (Aggregator)new MaxValue());
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("left", source1);
        sources.put("right", source2);
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        sinks.put("join", sink);
        try {
            Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, new Pipe[]{merge});
            BuildJobsHadoopPlatformTest.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Ignore
    public void testManyJoins() {
        int n = 50;
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        HashMap<String, Hfs> sinks = new HashMap<String, Hfs>();
        Pipe[] pipes = new Pipe[n];
        int count = 0;
        for (int i = 0; i < n; ++i) {
            String nameIn = "in" + i;
            String nameOut = "out" + i;
            Pipe pipe = new Pipe(nameIn);
            sources.put(nameIn, new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"key" + i})), "foo/in" + i));
            sinks.put(nameOut, new Hfs((Scheme)new TextLine(), "foo/out" + i));
            count += 2;
            if (i > 0) {
                pipe = new CoGroup(pipes[i - 1], new Fields(new Comparable[]{"key" + (i - 1)}), pipe, new Fields(new Comparable[]{"key" + i}));
                pipe = new Every(pipe, new Fields(new Comparable[]{"key" + (i - 1)}), (Aggregator)new Sum());
                count += 2;
            }
            pipes[i] = new Pipe(nameOut, pipe);
            ++count;
        }
        long start = System.currentTimeMillis();
        Flow flow = this.getPlatform().getFlowConnector().connect(sources, sinks, pipes);
        long end = System.currentTimeMillis();
        System.out.printf("n = %d: elements: %d: %.03f seconds\n", n, count, (double)(end - start) / 1000.0);
    }
}

