/*
 * Decompiled with CFR 0.152.
 */
package cascading.operation.local;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.StepCounters;
import cascading.operation.Aggregator;
import cascading.operation.Filter;
import cascading.operation.Function;
import cascading.operation.aggregator.Count;
import cascading.operation.filter.Limit;
import cascading.operation.filter.Stop;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import data.InputData;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;

public class StopFilterPlatformTest
extends PlatformTestCase {
    public StopFilterPlatformTest() {
        super(true, 5, 3);
    }

    @Test
    public void testSimple() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache200);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache200);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new Stop((Filter)new Limit(100L)));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        StopFilterPlatformTest.validateLength((TupleEntryIterator)flow.openSink(), (int)100);
        StopFilterPlatformTest.assertEquals((long)100L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Written));
        StopFilterPlatformTest.assertEquals((long)101L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Read));
    }

    @Test
    public void testSimpleGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache200);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache200);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Each(pipe, new Fields(new Comparable[]{"ip"}), (Filter)new Stop((Filter)new Limit(100L)));
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath(), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.complete();
        StopFilterPlatformTest.validateLength((TupleEntryIterator)flow.openSink(), (int)100);
        StopFilterPlatformTest.assertEquals((long)100L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Written));
        StopFilterPlatformTest.assertEquals((long)200L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Read));
    }

    @Test
    public void testCoGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Tap sourceLower = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileLower);
        Tap sourceUpper = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileUpper);
        HashMap<String, Tap> sources = new HashMap<String, Tap>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        Tap sink = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), this.getOutputPath("cogroup"), SinkMode.REPLACE);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, new Fields(new Comparable[]{"num", "char"}), (Filter)new Stop((Filter)new Limit(2L)));
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, new Fields(new Comparable[]{"num", "char"}), (Filter)new Stop((Filter)new Limit(2L)));
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), (Joiner)new InnerJoin(Fields.size((int)4)));
        splice = new Each((Pipe)splice, Fields.ALL, (Filter)new Stop((Filter)new Limit(2L)));
        Map properties = this.getProperties();
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, sink, (Pipe)splice);
        flow.complete();
        StopFilterPlatformTest.validateLength((Flow)flow, (int)2);
        List values = StopFilterPlatformTest.getSinkAsList((Flow)flow);
        StopFilterPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"1\ta\t1\tA"})));
        StopFilterPlatformTest.assertTrue((boolean)values.contains(new Tuple(new Object[]{"2\tb\t2\tB"})));
        StopFilterPlatformTest.assertEquals((long)2L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Written));
        StopFilterPlatformTest.assertEquals((long)6L, (long)flow.getFlowStats().getCounterValue((Enum)StepCounters.Tuples_Read));
    }
}

