/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.local;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.flow.local.LocalFlowConnector;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.scheme.local.CompressorScheme;
import cascading.scheme.local.Compressors;
import cascading.scheme.local.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.local.DirTap;
import cascading.tap.local.StdInTap;
import cascading.tap.local.StdOutTap;
import cascading.tuple.Fields;
import data.InputData;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.LineNumberReader;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.junit.Test;

public class LocalTapPlatformTest
extends PlatformTestCase
implements Serializable {
    @Test
    public void testIO() {
        String lines = "line1\nline2\n";
        System.setIn(new ByteArrayInputStream(lines.getBytes()));
        ByteArrayOutputStream output = new ByteArrayOutputStream();
        System.setOut(new PrintStream(output));
        StdInTap source = new StdInTap((Scheme)new TextLine(new Fields(new Comparable[]{"line"})));
        StdOutTap sink = new StdOutTap((Scheme)new TextLine(new Fields(new Comparable[]{"line"})));
        Pipe pipe = new Pipe("io");
        Flow flow = new LocalFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        LocalTapPlatformTest.assertEquals((String)lines, (String)output.toString());
    }

    @Test
    public void testSourceConfInit() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileNums20);
        SchemeWithProperties scheme = new SchemeWithProperties(new Fields(new Comparable[]{"line"}));
        Tap source = this.getPlatform().getTap((Scheme)scheme, InputData.inputFileNums20, SinkMode.KEEP);
        Pipe pipe = new Pipe("test");
        Tap sink = this.getPlatform().getTextFile(this.getOutputPath("sourceconfinit"), SinkMode.REPLACE);
        Properties properties = new Properties();
        properties.setProperty("default", "connector-default");
        properties.setProperty("replace", "connector-replace");
        Flow flow = this.getPlatform().getFlowConnector((Map)properties).connect(source, sink, pipe);
        flow.complete();
        LocalTapPlatformTest.assertTrue((boolean)flow.resourceExists(sink));
    }

    @Test
    public void testSinkConfInit() throws IOException {
        this.getPlatform().copyFromLocal(InputData.inputFileNums20);
        Tap source = this.getPlatform().getTextFile(new Fields(new Comparable[]{"line"}), InputData.inputFileNums20, SinkMode.KEEP);
        Pipe pipe = new Pipe("test");
        SchemeWithProperties scheme = new SchemeWithProperties(new Fields(new Comparable[]{"line"}));
        Tap sink = this.getPlatform().getTap((Scheme)scheme, this.getOutputPath("sinkconfinit"), SinkMode.REPLACE);
        Properties properties = new Properties();
        properties.setProperty("default", "connector-default");
        properties.setProperty("replace", "connector-replace");
        Flow flow = this.getPlatform().getFlowConnector((Map)properties).connect(source, sink, pipe);
        flow.complete();
        LocalTapPlatformTest.assertTrue((boolean)flow.resourceExists(sink));
    }

    @Test
    public void testDirTap() throws Exception {
        DirTap source = new DirTap((Scheme)new TextLine(), InputData.inputPath, "glob:**/*.txt");
        DirTap sink = new DirTap((Scheme)new TextLine(), this.getOutputPath(), SinkMode.REPLACE);
        Pipe pipe = new Pipe("copy");
        Flow flow = this.getPlatform().getFlowConnector().connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        List list = LocalTapPlatformTest.getSinkAsList((Flow)flow);
        LocalTapPlatformTest.assertEquals((int)674, (int)list.size());
    }

    @Test
    public void testSchemeCompression() throws Exception {
        DirTap source = new DirTap((Scheme)new TextLine(), InputData.inputPath, "glob:**/*.txt");
        DirTap compressed = new DirTap((Scheme)new TextLine((CompressorScheme.Compressor)Compressors.GZIP), this.getOutputPath("compressed"), SinkMode.REPLACE);
        DirTap sink = new DirTap((Scheme)new TextLine(), this.getOutputPath("uncompressed"), SinkMode.REPLACE);
        Flow first = this.getPlatform().getFlowConnector().connect("first", (Tap)source, (Tap)compressed, new Pipe("copy"));
        first.complete();
        Flow second = this.getPlatform().getFlowConnector().connect("second", (Tap)compressed, (Tap)sink, new Pipe("copy"));
        second.complete();
        List list = LocalTapPlatformTest.getSinkAsList((Flow)second);
        LocalTapPlatformTest.assertEquals((int)674, (int)list.size());
    }

    private static class SchemeWithProperties
    extends TextLine {
        public SchemeWithProperties(Fields sourceFields) {
            super(sourceFields);
        }

        public void sourceConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf) {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            conf.setProperty("replace", "source-replace");
            conf.setProperty("local", "source-local");
            super.sourceConfInit(flowProcess, tap, conf);
        }

        public void sinkConfInit(FlowProcess<? extends Properties> flowProcess, Tap<Properties, InputStream, OutputStream> tap, Properties conf) {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            conf.setProperty("replace", "sink-replace");
            conf.setProperty("local", "sink-local");
            super.sinkConfInit(flowProcess, tap, conf);
        }

        public void sourcePrepare(FlowProcess<? extends Properties> flowProcess, SourceCall<LineNumberReader, InputStream> sourceCall) throws IOException {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            if (!"source-replace".equals(flowProcess.getProperty("replace"))) {
                throw new RuntimeException("not replaced value");
            }
            if (!"source-local".equals(flowProcess.getProperty("local"))) {
                throw new RuntimeException("not local value");
            }
            super.sourcePrepare(flowProcess, sourceCall);
        }

        public void sinkPrepare(FlowProcess<? extends Properties> flowProcess, SinkCall<PrintWriter, OutputStream> sinkCall) throws IOException {
            if (!"connector-default".equals(flowProcess.getProperty("default"))) {
                throw new RuntimeException("not default value");
            }
            if (!"sink-replace".equals(flowProcess.getProperty("replace"))) {
                throw new RuntimeException("not replaced value");
            }
            if (!"sink-local".equals(flowProcess.getProperty("local"))) {
                throw new RuntimeException("not local value");
            }
            super.sinkPrepare(flowProcess, sinkCall);
        }
    }
}

