/*
 * Decompiled with CFR 0.152.
 */
package cascading.tap.hadoop.io;

import cascading.flow.FlowProcess;
import cascading.flow.hadoop.MapRed;
import cascading.flow.hadoop.util.HadoopUtil;
import cascading.tap.Tap;
import cascading.tap.TapException;
import cascading.tap.hadoop.util.Hadoop18TapUtil;
import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TapOutputCollector
implements OutputCollector,
Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(TapOutputCollector.class);
    public static final String PART_TASK_PATTERN = "%s%spart-%05d";
    public static final String PART_TASK_SEQ_PATTERN = "%s%spart-%05d-%05d";
    private Configuration conf;
    private RecordWriter writer;
    private String filenamePattern;
    private String filename;
    private Tap<Configuration, RecordReader, OutputCollector> tap;
    private String prefix;
    private long sequence;
    private boolean isFileOutputFormat;
    private final FlowProcess<? extends Configuration> flowProcess;

    public TapOutputCollector(FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap) throws IOException {
        this(flowProcess, tap, null);
    }

    public TapOutputCollector(FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix) throws IOException {
        this(flowProcess, tap, prefix, -1L);
    }

    public TapOutputCollector(FlowProcess<? extends Configuration> flowProcess, Tap<Configuration, RecordReader, OutputCollector> tap, String prefix, long sequence) throws IOException {
        this.tap = tap;
        this.sequence = sequence;
        this.prefix = prefix == null || prefix.length() == 0 ? null : prefix;
        this.flowProcess = flowProcess;
        this.conf = (Configuration)this.flowProcess.getConfigCopy();
        this.filenamePattern = this.conf.get("cascading.tapcollector.partname", sequence == -1L ? PART_TASK_PATTERN : PART_TASK_SEQ_PATTERN);
        this.initialize();
    }

    protected void initialize() throws IOException {
        this.tap.sinkConfInit(this.flowProcess, (Object)this.conf);
        OutputFormat outputFormat = HadoopUtil.asJobConfInstance(this.conf).getOutputFormat();
        this.isFileOutputFormat = outputFormat instanceof FileOutputFormat;
        if (this.isFileOutputFormat) {
            Hadoop18TapUtil.setupJob(this.conf);
            Hadoop18TapUtil.setupTask(this.conf);
            int partition = this.conf.getInt("mapred.task.partition", this.conf.getInt("mapreduce.task.partition", 0));
            long localSequence = this.sequence == -1L ? 0L : this.sequence;
            this.filename = this.prefix != null ? String.format(this.filenamePattern, this.prefix, "/", partition, localSequence) : String.format(this.filenamePattern, "", "", partition, localSequence);
        }
        LOG.info("creating path: {}", (Object)this.filename);
        this.writer = outputFormat.getRecordWriter(null, HadoopUtil.asJobConfInstance(this.conf), this.filename, (Progressable)this.getReporter());
    }

    private Reporter getReporter() {
        Reporter reporter = Reporter.NULL;
        if (this.flowProcess instanceof MapRed) {
            reporter = ((MapRed)this.flowProcess).getReporter();
        }
        return reporter;
    }

    public void collect(Object writableComparable, Object writable) throws IOException {
        this.flowProcess.keepAlive();
        this.writer.write(writableComparable, writable);
    }

    @Override
    public void close() {
        try {
            if (this.isFileOutputFormat) {
                LOG.info("closing tap collector for: {}", (Object)new Path(this.tap.getIdentifier(), this.filename));
            } else {
                LOG.info("closing tap collector for: {}", this.tap);
            }
            try {
                this.writer.close(this.getReporter());
            }
            finally {
                if (this.isFileOutputFormat) {
                    boolean needsTaskCommit = Hadoop18TapUtil.needsTaskCommit(this.conf);
                    boolean cleanJob = true;
                    if (needsTaskCommit) {
                        cleanJob = Hadoop18TapUtil.commitTask(this.conf);
                    }
                    if (cleanJob) {
                        Hadoop18TapUtil.cleanupJob(this.conf);
                    }
                    if (!HadoopUtil.isInflow(this.conf)) {
                        Hadoop18TapUtil.writeSuccessMarker(this.conf);
                    }
                }
            }
        }
        catch (IOException exception) {
            LOG.warn("exception closing: {}", (Object)this.filename, (Object)exception);
            throw new TapException("exception closing: " + this.filename, (Throwable)exception);
        }
    }
}

