/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.hadoop;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.tap.DecoratorTap;
import cascading.tap.MultiSourceTap;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDistCacheTap
extends DecoratorTap<Void, Configuration, RecordReader, OutputCollector> {
    private static final Logger LOG = LoggerFactory.getLogger(BaseDistCacheTap.class);

    public BaseDistCacheTap(Tap<Configuration, RecordReader, OutputCollector> original) {
        super(original);
    }

    public void sourceConfInit(FlowProcess<? extends Configuration> process, Configuration conf) {
        if (HadoopUtil.isLocal(conf) || Tap.id((Tap)this).equals(conf.get("cascading.node.source")) || Tap.id((Tap)this).equals(conf.get("cascading.step.source"))) {
            LOG.info("can't use distributed cache. reading '{}' from hdfs", (Object)super.getIdentifier());
            super.sourceConfInit(process, (Object)conf);
            return;
        }
        try {
            this.registerHfs(process, conf, this.getHfs());
        }
        catch (IOException exception) {
            throw new TapException((Throwable)exception);
        }
    }

    public TupleEntryIterator openForRead(FlowProcess<? extends Configuration> flowProcess, RecordReader input) throws IOException {
        if (HadoopUtil.isLocal((Configuration)flowProcess.getConfig()) || input != null) {
            LOG.info("delegating to parent");
            return super.openForRead(flowProcess, (Object)input);
        }
        Path[] cachedFiles = this.getLocalCacheFiles(flowProcess);
        if (cachedFiles == null || cachedFiles.length == 0) {
            return super.openForRead(flowProcess, null);
        }
        ArrayList<Path> paths = new ArrayList<Path>();
        ArrayList<Lfs> taps = new ArrayList<Lfs>();
        if (this.isSimpleGlob()) {
            FileStatus[] statuses;
            FileSystem fs = FileSystem.get((Configuration)((Configuration)flowProcess.getConfig()));
            for (FileStatus fileStatus : statuses = fs.globStatus(this.getHfs().getPath())) {
                paths.add(fileStatus.getPath());
            }
        } else {
            paths.add(this.getHfs().getPath());
        }
        for (Path pathToFind : paths) {
            for (FileStatus fileStatus : cachedFiles) {
                if (!fileStatus.toString().endsWith(pathToFind.getName())) continue;
                LOG.info("found {} in distributed cache", (Object)fileStatus);
                taps.add(new Lfs(this.getScheme(), fileStatus.toString()));
            }
        }
        if (paths.isEmpty()) {
            LOG.info("could not find files in local resource path. delegating to parent: {}", (Object)super.getIdentifier());
            return super.openForRead(flowProcess, (Object)input);
        }
        return new MultiSourceTap(taps.toArray(new Tap[taps.size()])).openForRead(flowProcess, (Object)input);
    }

    private void registerHfs(FlowProcess<? extends Configuration> process, Configuration conf, Hfs hfs) throws IOException {
        if (this.isSimpleGlob()) {
            FileSystem fs = FileSystem.get((Configuration)conf);
            FileStatus[] statuses = fs.globStatus(this.getHfs().getPath());
            if (statuses == null || statuses.length == 0) {
                throw new TapException(String.format("glob expression %s does not match any files on the filesystem", this.getHfs().getPath()));
            }
            for (FileStatus fileStatus : statuses) {
                this.registerURI(conf, fileStatus.getPath());
            }
        } else {
            this.registerURI(conf, hfs.getPath());
        }
        hfs.sourceConfInitComplete(process, conf);
    }

    private void registerURI(Configuration conf, Path path) {
        URI uri = path.toUri();
        LOG.info("adding {} to local resource configuration ", (Object)uri);
        this.addLocalCacheFiles(conf, uri);
    }

    private Hfs getHfs() {
        return (Hfs)this.getOriginal();
    }

    private boolean isSimpleGlob() {
        if (Util.isEmpty((String)this.getHfs().getIdentifier())) {
            return false;
        }
        return this.getHfs().getIdentifier().contains("*");
    }

    protected abstract Path[] getLocalCacheFiles(FlowProcess<? extends Configuration> var1) throws IOException;

    protected abstract void addLocalCacheFiles(Configuration var1, URI var2);
}

