/*
 * Decompiled with CFR 0.152.
 */
package cascading.flow.hadoop.util;

import cascading.flow.FlowException;
import cascading.flow.FlowProcess;
import cascading.flow.hadoop.HadoopFlowProcess;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.hadoop.Hfs;
import cascading.tap.hadoop.Lfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
import cascading.tuple.TupleEntryIterator;
import cascading.util.Util;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HadoopMRUtil {
    private static final Logger LOG = LoggerFactory.getLogger(HadoopMRUtil.class);

    public static String writeStateToDistCache(JobConf conf, String id, String kind, String stepState) {
        if (Util.isEmpty((String)stepState)) {
            return null;
        }
        LOG.info("writing step state to dist cache, too large for job conf, size: {}", (Object)stepState.length());
        String statePath = Hfs.getTempPath((Configuration)conf) + "/" + kind + "-state-" + id;
        Hfs temp = new Hfs((Scheme)new TextLine(), statePath, SinkMode.REPLACE);
        try {
            TupleEntryCollector writer = temp.openForWrite((FlowProcess)new HadoopFlowProcess(conf));
            writer.add(new Tuple(new Object[]{stepState}));
            writer.close();
        }
        catch (IOException exception) {
            throw new FlowException("unable to write step state to Hadoop FS: " + temp.getIdentifier());
        }
        URI uri = new Path(statePath).toUri();
        DistributedCache.addCacheFile((URI)uri, (Configuration)conf);
        LOG.info("using step state path: {}", (Object)uri);
        return statePath;
    }

    public static String readStateFromDistCache(JobConf jobConf, String id, String kind) throws IOException {
        Path[] files = DistributedCache.getLocalCacheFiles((Configuration)jobConf);
        Path stepStatePath = null;
        for (Path file : files) {
            if (!file.toString().contains(kind + "-state-" + id)) continue;
            stepStatePath = file;
            break;
        }
        if (stepStatePath == null) {
            throw new FlowException("unable to find step state from distributed cache");
        }
        LOG.info("reading step state from local path: {}", stepStatePath);
        Lfs temp = new Lfs((Scheme)new TextLine(new Fields(new Comparable[]{"line"})), stepStatePath.toString());
        TupleEntryIterator reader = null;
        try {
            reader = temp.openForRead((FlowProcess)new HadoopFlowProcess(jobConf));
            if (!reader.hasNext()) {
                throw new FlowException("step state path is empty: " + temp.getIdentifier());
            }
            String string = ((TupleEntry)reader.next()).getString((Comparable)Integer.valueOf(0));
            return string;
        }
        catch (IOException exception) {
            throw new FlowException("unable to find state path: " + temp.getIdentifier(), (Throwable)exception);
        }
        finally {
            try {
                if (reader != null) {
                    reader.close();
                }
            }
            catch (IOException exception) {
                LOG.warn("error closing state path reader", (Throwable)exception);
            }
        }
    }

    public static Map<Path, Path> addToClassPath(Configuration config, List<String> classpath) {
        if (classpath == null) {
            return null;
        }
        HashMap localPaths = new HashMap();
        HashMap remotePaths = new HashMap();
        HadoopUtil.resolvePaths((Configuration)config, classpath, null, null, localPaths, remotePaths);
        try {
            LocalFileSystem localFS = HadoopUtil.getLocalFS((Configuration)config);
            for (String path : localPaths.keySet()) {
                if (remotePaths.containsKey(path)) continue;
                Path artifact = (Path)localPaths.get(path);
                DistributedCache.addFileToClassPath((Path)artifact.makeQualified((FileSystem)localFS), (Configuration)config);
            }
            FileSystem defaultFS = HadoopUtil.getDefaultFS((Configuration)config);
            for (String path : remotePaths.keySet()) {
                Path artifact = (Path)remotePaths.get(path);
                DistributedCache.addFileToClassPath((Path)artifact.makeQualified(defaultFS), (Configuration)config);
            }
        }
        catch (IOException exception) {
            throw new FlowException("unable to set distributed cache paths", (Throwable)exception);
        }
        return HadoopUtil.getCommonPaths(localPaths, remotePaths);
    }

    public static boolean hasReducer(JobConf jobConf) {
        return jobConf.getReducerClass() != null;
    }
}

