/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.stream;

import cascading.flow.stream.CountingCollectStage;
import cascading.flow.stream.CountingItemStage;
import cascading.flow.stream.TestGate;
import cascading.flow.stream.TestSinkStage;
import cascading.flow.stream.TestSourceStage;
import cascading.flow.stream.graph.StreamGraph;
import java.util.ArrayList;
import junit.framework.TestCase;

public class StreamTest
extends TestCase {
    public void testStageStream() {
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            values.add("value");
        }
        TestSourceStage source = new TestSourceStage(values);
        CountingItemStage lhsStage1 = new CountingItemStage();
        CountingItemStage lhsStage2 = new CountingItemStage();
        CountingItemStage rhsStage1 = new CountingItemStage();
        CountingItemStage rhsStage2 = new CountingItemStage();
        CountingItemStage mergeStage = new CountingItemStage();
        TestSinkStage lhsSink = new TestSinkStage();
        TestSinkStage rhsSink = new TestSinkStage();
        StreamGraph graph = new StreamGraph();
        graph.addHead(source);
        graph.addPath(source, lhsStage1);
        graph.addPath(lhsStage1, mergeStage);
        graph.addPath(mergeStage, lhsStage2);
        graph.addPath(lhsStage2, lhsSink);
        graph.addTail(lhsSink);
        graph.addPath(source, rhsStage1);
        graph.addPath(rhsStage1, mergeStage);
        graph.addPath(mergeStage, rhsStage2);
        graph.addPath(rhsStage2, rhsSink);
        graph.addTail(rhsSink);
        graph.bind();
        graph.prepare();
        source.receiveFirst(null);
        graph.cleanup();
        this.assertPrepareCleanup(lhsStage1);
        this.assertPrepareCleanup(rhsStage1);
        StreamTest.assertEquals((int)values.size(), (int)lhsStage1.getReceiveCount());
        StreamTest.assertEquals((int)values.size(), (int)rhsStage1.getReceiveCount());
        StreamTest.assertEquals((int)(values.size() * 2), (int)lhsStage2.getReceiveCount());
        StreamTest.assertEquals((int)(values.size() * 2), (int)rhsStage2.getReceiveCount());
        StreamTest.assertEquals((int)(values.size() * 2), (int)lhsSink.getResults().size());
        StreamTest.assertEquals((int)(values.size() * 2), (int)rhsSink.getResults().size());
    }

    public void testGateStageStream() {
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            values.add("value");
        }
        TestSourceStage source = new TestSourceStage(values);
        TestGate gate = new TestGate();
        CountingItemStage stage = new CountingItemStage();
        TestSinkStage sink = new TestSinkStage();
        StreamGraph graph = new StreamGraph();
        graph.addHead(source);
        graph.addPath(source, gate);
        graph.addPath(gate, stage);
        graph.addPath(stage, sink);
        graph.addTail(sink);
        graph.bind();
        graph.prepare();
        source.receiveFirst(null);
        graph.cleanup();
        this.assertPrepareCleanup(stage);
        StreamTest.assertEquals((int)values.size(), (int)stage.getReceiveCount());
        StreamTest.assertEquals((int)values.size(), (int)sink.getResults().size());
    }

    public void testGateGroupStream() {
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            values.add("value");
        }
        TestSourceStage source = new TestSourceStage(values);
        TestGate gate = new TestGate();
        CountingCollectStage stage1 = new CountingCollectStage();
        CountingCollectStage stage2 = new CountingCollectStage();
        TestSinkStage sink = new TestSinkStage();
        StreamGraph graph = new StreamGraph();
        graph.addHead(source);
        graph.addPath(source, gate);
        graph.addPath(gate, stage1);
        graph.addPath(stage1, stage2);
        graph.addPath(stage2, sink);
        graph.addTail(sink);
        graph.bind();
        graph.prepare();
        source.receiveFirst(null);
        graph.cleanup();
        this.assertPrepareCleanup(stage1);
        StreamTest.assertEquals((int)values.size(), (int)stage1.getReceiveCount());
        StreamTest.assertEquals((int)1, (int)stage1.getStartCount());
        StreamTest.assertEquals((int)1, (int)stage1.getCompleteCount());
        StreamTest.assertEquals((int)values.size(), (int)stage2.getReceiveCount());
        StreamTest.assertEquals((int)1, (int)stage2.getStartCount());
        StreamTest.assertEquals((int)1, (int)stage2.getCompleteCount());
        StreamTest.assertEquals((int)1, (int)sink.getResults().size());
    }

    public void testMergeGateGroupStream() {
        ArrayList<String> values = new ArrayList<String>();
        for (int i = 0; i < 10; ++i) {
            values.add("value");
        }
        TestSourceStage source1 = new TestSourceStage(values);
        TestSourceStage source2 = new TestSourceStage(values);
        TestGate gate = new TestGate();
        CountingCollectStage stage1 = new CountingCollectStage();
        CountingCollectStage stage2 = new CountingCollectStage();
        TestSinkStage sink = new TestSinkStage();
        StreamGraph graph = new StreamGraph();
        graph.addHead(source1);
        graph.addHead(source2);
        graph.addPath(source1, 0, gate);
        graph.addPath(source2, 1, gate);
        graph.addPath(gate, stage1);
        graph.addPath(stage1, stage2);
        graph.addPath(stage2, sink);
        graph.addTail(sink);
        graph.bind();
        graph.prepare();
        source1.receiveFirst(null);
        source2.receiveFirst(null);
        graph.cleanup();
        this.assertPrepareCleanup(stage1);
        StreamTest.assertEquals((int)(values.size() * 2), (int)stage1.getReceiveCount());
        StreamTest.assertEquals((int)1, (int)stage1.getStartCount());
        StreamTest.assertEquals((int)1, (int)stage1.getCompleteCount());
        StreamTest.assertEquals((int)(values.size() * 2), (int)stage2.getReceiveCount());
        StreamTest.assertEquals((int)1, (int)stage2.getStartCount());
        StreamTest.assertEquals((int)1, (int)stage2.getCompleteCount());
        StreamTest.assertEquals((int)1, (int)sink.getResults().size());
    }

    private void assertPrepareCleanup(CountingItemStage stage) {
        StreamTest.assertEquals((int)1, (int)stage.getPrepareCount());
        StreamTest.assertEquals((int)1, (int)stage.getCleanupCount());
    }

    private void assertPrepareCleanup(CountingCollectStage stage) {
        StreamTest.assertEquals((int)1, (int)stage.getPrepareCount());
        StreamTest.assertEquals((int)1, (int)stage.getCleanupCount());
    }
}

