package org.apache.streams.hdfs;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.hdfs.HdfsWriterConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistReaderTask.class */
public class WebHdfsPersistReaderTask implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
    private WebHdfsPersistReader reader;

    public WebHdfsPersistReaderTask(WebHdfsPersistReader webHdfsPersistReader) {
        this.reader = webHdfsPersistReader;
    }

    @Override // java.lang.Runnable
    public void run() {
        LOGGER.info("WebHdfsPersistReaderTask: files to process");
        for (FileStatus fileStatus : this.reader.status) {
            LOGGER.info("    " + fileStatus.getPath().getName());
        }
        for (FileStatus fileStatus2 : this.reader.status) {
            if (fileStatus2.isFile() && !fileStatus2.getPath().getName().startsWith("_")) {
                HdfsWriterConfiguration.Compression compression = HdfsWriterConfiguration.Compression.NONE;
                if (fileStatus2.getPath().getName().endsWith(".gz")) {
                    compression = HdfsWriterConfiguration.Compression.GZIP;
                }
                LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", new Object[]{fileStatus2.getPath().getName(), this.reader.hdfsConfiguration.getEncoding(), compression.toString()});
                try {
                    GZIPInputStream open = this.reader.client.open(fileStatus2.getPath());
                    if (compression.equals(HdfsWriterConfiguration.Compression.GZIP)) {
                        open = new GZIPInputStream(open);
                    }
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(open, this.reader.hdfsConfiguration.getEncoding()));
                    String str = "";
                    do {
                        try {
                            str = bufferedReader.readLine();
                            if (!Strings.isNullOrEmpty(str)) {
                                this.reader.countersCurrent.incrementAttempt();
                                StreamsDatum processLine = this.reader.lineReaderUtil.processLine(str);
                                if (processLine != null) {
                                    this.reader.write(processLine);
                                    this.reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                                } else {
                                    LOGGER.warn("processLine failed");
                                    this.reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                                }
                            }
                        } catch (Exception e) {
                            LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", e);
                            this.reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                        }
                    } while (!Strings.isNullOrEmpty(str));
                    LOGGER.info("Finished Processing " + fileStatus2.getPath().getName());
                    try {
                        bufferedReader.close();
                    } catch (Exception e2) {
                        LOGGER.error("WebHdfsPersistReader Exception: {}", e2);
                    }
                } catch (Exception e3) {
                    LOGGER.error("Exception Opening " + fileStatus2.getPath(), e3.getMessage());
                    return;
                }
            }
        }
        LOGGER.info("WebHdfsPersistReaderTask Finished");
        Uninterruptibles.sleepUninterruptibly(15L, TimeUnit.SECONDS);
    }
}
