/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.local;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.Identity;
import cascading.pipe.Each;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.TupleEntryStream;
import data.InputData;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Test;

public class FlowPlatformTest
extends PlatformTestCase {
    @Test
    public void testStop() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache200);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"offset", "line"}), InputData.inputFileApache200);
        Pipe pipe = new Pipe("test");
        final int limit = 10;
        final Semaphore start = new Semaphore(0);
        final AtomicInteger count = new AtomicInteger(0);
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new Identity(Fields.ARGS){

            public void operate(FlowProcess flowProcess, FunctionCall<Identity.Functor> functionCall) {
                int i = count.getAndAdd(1);
                if (i >= limit) {
                    start.release();
                    return;
                }
                super.operate(flowProcess, functionCall);
            }
        });
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("simple"), SinkMode.REPLACE);
        Flow flow = this.getPlatform().getFlowConnector().connect(source, sink, pipe);
        flow.start();
        start.acquire();
        flow.stop();
        long sourceSize = TupleEntryStream.entryStream((Tap)source, (FlowProcess)flow.getFlowProcess()).count();
        long sinkSize = TupleEntryStream.entryStream((Tap)sink, (FlowProcess)flow.getFlowProcess()).count();
        FlowPlatformTest.assertNotSame((Object)sourceSize, (Object)sinkSize);
        FlowPlatformTest.assertEquals((long)limit, (long)sinkSize);
    }
}

