/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.tools.distcp2.mapred.lib;

import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileRecordReader;
import org.apache.hadoop.tools.distcp2.util.DistCpUtils;

class DynamicInputChunk<K, V> {
    private static Log LOG = LogFactory.getLog(DynamicInputChunk.class);
    private static Configuration configuration;
    private static Path chunkRootPath;
    private static String chunkFilePrefix;
    private static int numChunksLeft;
    private static FileSystem fs;
    private Path chunkFilePath;
    private SequenceFileRecordReader<K, V> reader;
    private SequenceFile.Writer writer;

    private static void initializeChunkInvariants(Configuration config) throws IOException {
        configuration = config;
        Path listingFilePath = new Path(DynamicInputChunk.getListingFilePath(configuration));
        chunkRootPath = new Path(listingFilePath.getParent(), "chunkDir");
        fs = chunkRootPath.getFileSystem(configuration);
        chunkFilePrefix = listingFilePath.getName() + ".chunk.";
    }

    private static String getListingFilePath(Configuration configuration) {
        String listingFileString = configuration.get("distcp.listing.file.path", "");
        assert (!listingFileString.equals("")) : "Listing file not found.";
        return listingFileString;
    }

    private static boolean areInvariantsInitialized() {
        return chunkRootPath != null;
    }

    private DynamicInputChunk(String chunkId, Configuration configuration) throws IOException {
        if (!DynamicInputChunk.areInvariantsInitialized()) {
            DynamicInputChunk.initializeChunkInvariants(configuration);
        }
        this.chunkFilePath = new Path(chunkRootPath, chunkFilePrefix + chunkId);
        this.openForWrite();
    }

    private void openForWrite() throws IOException {
        this.writer = SequenceFile.createWriter(this.chunkFilePath.getFileSystem(configuration), configuration, this.chunkFilePath, Text.class, FileStatus.class, SequenceFile.CompressionType.NONE);
    }

    public static DynamicInputChunk createChunkForWrite(String chunkId, Configuration configuration) throws IOException {
        return new DynamicInputChunk(chunkId, configuration);
    }

    public void write(Text key, FileStatus value) throws IOException {
        this.writer.append(key, value);
    }

    public void close() {
        IOUtils.cleanup(LOG, this.reader, this.writer);
    }

    public void assignTo(TaskID taskId) throws IOException {
        Path newPath = new Path(chunkRootPath, taskId.toString());
        if (!fs.rename(this.chunkFilePath, newPath)) {
            LOG.warn((Object)(this.chunkFilePath + " could not be assigned to " + taskId));
        }
    }

    private DynamicInputChunk(Path chunkFilePath, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        if (!DynamicInputChunk.areInvariantsInitialized()) {
            DynamicInputChunk.initializeChunkInvariants(taskAttemptContext.getConfiguration());
        }
        this.chunkFilePath = chunkFilePath;
        this.openForRead(taskAttemptContext);
    }

    private void openForRead(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.reader = new SequenceFileRecordReader();
        this.reader.initialize(new FileSplit(this.chunkFilePath, 0L, DistCpUtils.getFileSize(this.chunkFilePath, configuration), null), taskAttemptContext);
    }

    public static DynamicInputChunk acquire(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        String taskId;
        Path acquiredFilePath;
        if (!DynamicInputChunk.areInvariantsInitialized()) {
            DynamicInputChunk.initializeChunkInvariants(taskAttemptContext.getConfiguration());
        }
        if (fs.exists(acquiredFilePath = new Path(chunkRootPath, taskId = taskAttemptContext.getTaskAttemptID().getTaskID().toString()))) {
            LOG.info((Object)("Acquiring pre-assigned chunk: " + acquiredFilePath));
            return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
        }
        for (FileStatus chunkFile : DynamicInputChunk.getListOfChunkFiles()) {
            if (fs.rename(chunkFile.getPath(), acquiredFilePath)) {
                LOG.info((Object)(taskId + " acquired " + chunkFile.getPath()));
                return new DynamicInputChunk(acquiredFilePath, taskAttemptContext);
            }
            LOG.warn((Object)(taskId + " could not acquire " + chunkFile.getPath()));
        }
        return null;
    }

    public void release() throws IOException {
        this.close();
        if (!fs.delete(this.chunkFilePath, false)) {
            LOG.error((Object)("Unable to release chunk at path: " + this.chunkFilePath));
            throw new IOException("Unable to release chunk at path: " + this.chunkFilePath);
        }
    }

    static FileStatus[] getListOfChunkFiles() throws IOException {
        Path chunkFilePattern = new Path(chunkRootPath, chunkFilePrefix + "*");
        FileStatus[] chunkFiles = fs.globStatus(chunkFilePattern);
        numChunksLeft = chunkFiles.length;
        return chunkFiles;
    }

    public Path getPath() {
        return this.chunkFilePath;
    }

    public SequenceFileRecordReader<K, V> getReader() {
        assert (this.reader != null) : "Reader un-initialized!";
        return this.reader;
    }

    public static int getNumChunksLeft() {
        return numChunksLeft;
    }

    static {
        numChunksLeft = -1;
    }
}

