package org.apache.streams.hdfs;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Queues;
import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.config.StreamsConfiguration;
import org.apache.streams.config.StreamsConfigurator;
import org.apache.streams.converter.LineReadWriteUtil;
import org.apache.streams.core.DatumStatusCountable;
import org.apache.streams.core.DatumStatusCounter;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistReader;
import org.apache.streams.core.StreamsResultSet;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/hdfs/WebHdfsPersistReader.class */
public class WebHdfsPersistReader implements StreamsPersistReader, DatumStatusCountable {
    public static final String STREAMS_ID = "WebHdfsPersistReader";
    private static final Logger LOGGER = LoggerFactory.getLogger(WebHdfsPersistReader.class);
    protected static final char DELIMITER = '\t';
    protected FileSystem client;
    protected Path path;
    protected FileStatus[] status;
    protected volatile Queue<StreamsDatum> persistQueue;
    protected ObjectMapper mapper;
    protected LineReadWriteUtil lineReaderUtil;
    protected HdfsReaderConfiguration hdfsConfiguration;
    protected StreamsConfiguration streamsConfiguration;
    private ExecutorService executor;
    protected DatumStatusCounter countersTotal;
    protected DatumStatusCounter countersCurrent;
    private Future<?> task;

    public WebHdfsPersistReader() {
        this((HdfsReaderConfiguration) new ComponentConfigurator(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
    }

    public WebHdfsPersistReader(HdfsReaderConfiguration hdfsReaderConfiguration) {
        this.countersTotal = new DatumStatusCounter();
        this.countersCurrent = new DatumStatusCounter();
        this.hdfsConfiguration = hdfsReaderConfiguration;
    }

    public URI getURI() throws URISyntaxException {
        StringBuilder sb = new StringBuilder();
        sb.append(this.hdfsConfiguration.getScheme());
        sb.append("://");
        if (StringUtils.isNotBlank(this.hdfsConfiguration.getHost())) {
            sb.append(this.hdfsConfiguration.getHost());
            if (this.hdfsConfiguration.getPort() != null) {
                sb.append(":" + this.hdfsConfiguration.getPort());
            }
        } else {
            sb.append("/");
        }
        return new URI(sb.toString());
    }

    public boolean isConnected() {
        return this.client != null;
    }

    public final synchronized FileSystem getFileSystem() {
        if (!isConnected()) {
            connectToWebHDFS();
        }
        return this.client;
    }

    private synchronized void connectToWebHDFS() {
        try {
            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
            UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
            createRemoteUser.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
            createRemoteUser.doAs(() -> {
                Configuration configuration = new Configuration();
                configuration.set("hadoop.security.authentication", "kerberos");
                configuration.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
                configuration.set("fs.file.impl", LocalFileSystem.class.getName());
                LOGGER.info("WebURI : {}", getURI().toString());
                this.client = FileSystem.get(getURI(), configuration);
                LOGGER.info("Connected to WebHDFS");
                return null;
            });
        } catch (Exception e) {
            LOGGER.error("There was an error connecting to WebHDFS, please check your settings and try again");
            e.printStackTrace();
        }
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void prepare(Object obj) {
        LOGGER.debug("Prepare");
        this.lineReaderUtil = LineReadWriteUtil.getInstance(this.hdfsConfiguration);
        connectToWebHDFS();
        String str = this.hdfsConfiguration.getPath() + "/" + this.hdfsConfiguration.getReaderPath();
        LOGGER.info("Path : {}", str);
        this.path = new Path(str);
        try {
            if (this.client.isFile(this.path)) {
                LOGGER.info("Found File");
                FileStatus fileStatus = this.client.getFileStatus(this.path);
                this.status = new FileStatus[1];
                this.status[0] = fileStatus;
            } else if (this.client.isDirectory(this.path)) {
                this.status = this.client.listStatus(this.path);
                List asList = Arrays.asList(this.status);
                Collections.sort(asList);
                this.status = (FileStatus[]) asList.toArray(new FileStatus[0]);
                LOGGER.info("Found Directory : {} files", Integer.valueOf(this.status.length));
            } else {
                LOGGER.error("Neither file nor directory, wtf");
            }
        } catch (IOException e) {
            LOGGER.error("IOException", e);
        }
        this.streamsConfiguration = StreamsConfigurator.detectConfiguration();
        this.persistQueue = Queues.synchronizedQueue(new LinkedBlockingQueue(this.streamsConfiguration.getBatchSize().intValue()));
        this.executor = Executors.newSingleThreadExecutor();
        this.mapper = StreamsJacksonMapper.getInstance();
    }

    public void cleanUp() {
    }

    public StreamsResultSet readAll() {
        Thread thread = new Thread(new WebHdfsPersistReaderTask(this));
        thread.start();
        try {
            thread.join();
        } catch (InterruptedException e) {
            LOGGER.trace("ignored InterruptedException", e);
        }
        return new StreamsResultSet(this.persistQueue);
    }

    public void startStream() {
        LOGGER.debug("startStream");
        this.task = this.executor.submit(new WebHdfsPersistReaderTask(this));
    }

    public StreamsResultSet readCurrent() {
        StreamsResultSet streamsResultSet;
        synchronized (WebHdfsPersistReader.class) {
            streamsResultSet = new StreamsResultSet(Queues.newConcurrentLinkedQueue(this.persistQueue));
            streamsResultSet.setCounter(new DatumStatusCounter());
            streamsResultSet.getCounter().add(this.countersCurrent);
            this.countersTotal.add(this.countersCurrent);
            this.countersCurrent = new DatumStatusCounter();
            this.persistQueue.clear();
        }
        return streamsResultSet;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void write(StreamsDatum streamsDatum) {
        boolean offer;
        do {
            synchronized (WebHdfsPersistReader.class) {
                offer = this.persistQueue.offer(streamsDatum);
            }
            Thread.yield();
        } while (!offer);
    }

    public StreamsResultSet readNew(BigInteger bigInteger) {
        return null;
    }

    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
        return null;
    }

    public boolean isRunning() {
        return this.task == null || !(this.task.isDone() || this.task.isCancelled());
    }

    public DatumStatusCounter getDatumStatusCounter() {
        return this.countersTotal;
    }
}
