package org.apache.streams.hdfs;

import com.google.common.base.Strings;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.FileStatus;
import org.apache.streams.core.DatumStatus;
import org.apache.streams.core.StreamsDatum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistReaderTask.class */
public class WebHdfsPersistReaderTask implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
    private WebHdfsPersistReader reader;

    public WebHdfsPersistReaderTask(WebHdfsPersistReader webHdfsPersistReader) {
        this.reader = webHdfsPersistReader;
    }

    @Override // java.lang.Runnable
    public void run() {
        for (FileStatus fileStatus : this.reader.status) {
            LOGGER.info("Found " + fileStatus.getPath().getName());
            if (fileStatus.isFile() && !fileStatus.getPath().getName().startsWith("_")) {
                LOGGER.info("Started Processing " + fileStatus.getPath().getName());
                try {
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(this.reader.client.open(fileStatus.getPath())));
                    String str = "";
                    do {
                        try {
                            str = bufferedReader.readLine();
                            if (!Strings.isNullOrEmpty(str)) {
                                this.reader.countersCurrent.incrementAttempt();
                                WebHdfsPersistReader webHdfsPersistReader = this.reader;
                                String[] split = str.split(Character.toString('\t'));
                                write(new StreamsDatum(split[3], split[0]));
                                this.reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            LOGGER.warn(e.getMessage());
                            this.reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
                        }
                    } while (!Strings.isNullOrEmpty(str));
                    LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
                    try {
                        bufferedReader.close();
                    } catch (Exception e2) {
                        e2.printStackTrace();
                        LOGGER.error(e2.getMessage());
                    }
                } catch (Exception e3) {
                    e3.printStackTrace();
                    LOGGER.error(e3.getMessage());
                    return;
                }
            }
        }
    }

    private void write(StreamsDatum streamsDatum) {
        boolean offer;
        do {
            synchronized (WebHdfsPersistReader.class) {
                offer = this.reader.persistQueue.offer(streamsDatum);
            }
            Thread.yield();
        } while (!offer);
    }
}
