package org.apache.asterix.external.input;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
import org.apache.asterix.external.api.IExternalIndexer;
import org.apache.asterix.external.api.IIndexibleExternalDataSource;
import org.apache.asterix.external.api.IRecordReader;
import org.apache.asterix.external.api.IRecordReaderFactory;
import org.apache.asterix.external.indexing.ExternalFile;
import org.apache.asterix.external.indexing.IndexingScheduler;
import org.apache.asterix.external.input.record.reader.IndexingStreamRecordReader;
import org.apache.asterix.external.input.record.reader.hdfs.HDFSRecordReader;
import org.apache.asterix.external.input.record.reader.stream.StreamRecordReader;
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.external.provider.ExternalIndexerProvider;
import org.apache.asterix.external.provider.StreamRecordReaderProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.hdfs.dataflow.ConfFactory;
import org.apache.hyracks.hdfs.dataflow.InputSplitsFactory;
import org.apache.hyracks.hdfs.scheduler.Scheduler;

/* loaded from: input_file:org/apache/asterix/external/input/HDFSDataSourceFactory.class */
public class HDFSDataSourceFactory implements IRecordReaderFactory<Object>, IIndexibleExternalDataSource {
    protected static final long serialVersionUID = 1;
    protected transient AlgebricksAbsolutePartitionConstraint clusterLocations;
    protected String[] readSchedule;
    protected boolean[] read;
    protected InputSplitsFactory inputSplitsFactory;
    protected ConfFactory confFactory;
    protected static Scheduler hdfsScheduler;
    protected static IndexingScheduler indexingScheduler;
    protected static Boolean initialized = false;
    protected static Object initLock = new Object();
    protected List<ExternalFile> files;
    protected Map<String, String> configuration;
    protected Class<?> recordClass;
    private JobConf conf;
    private InputSplit[] inputSplits;
    private String nodeName;
    private StreamRecordReaderProvider.Format format;
    protected boolean configured = false;
    protected boolean indexingOp = false;

    @Override // org.apache.asterix.external.api.IExternalDataSourceFactory
    public void configure(Map<String, String> map) throws AsterixException {
        try {
            init();
            this.configuration = map;
            JobConf configureHDFSJobConf = HDFSUtils.configureHDFSJobConf(map);
            this.confFactory = new ConfFactory(configureHDFSJobConf);
            this.clusterLocations = getPartitionConstraint();
            InputSplit[] splits = this.files == null ? configureHDFSJobConf.getInputFormat().getSplits(configureHDFSJobConf, this.clusterLocations.getLocations().length) : HDFSUtils.getSplits(configureHDFSJobConf, this.files);
            if (this.indexingOp) {
                this.readSchedule = indexingScheduler.getLocationConstraints(splits);
            } else {
                this.readSchedule = hdfsScheduler.getLocationConstraints(splits);
            }
            this.inputSplitsFactory = new InputSplitsFactory(splits);
            this.read = new boolean[this.readSchedule.length];
            Arrays.fill(this.read, false);
            String str = map.get(ExternalDataConstants.KEY_FORMAT);
            if (str == null || str.equals(ExternalDataConstants.FORMAT_HDFS_WRITABLE)) {
                RecordReader recordReader = configureHDFSJobConf.getInputFormat().getRecordReader(splits[0], configureHDFSJobConf, Reporter.NULL);
                this.recordClass = recordReader.createValue().getClass();
                recordReader.close();
            } else {
                this.format = StreamRecordReaderProvider.getReaderFormat(map);
                this.recordClass = char[].class;
            }
        } catch (IOException e) {
            throw new AsterixException(e);
        }
    }

    @Override // org.apache.asterix.external.api.IIndexibleExternalDataSource
    public void setSnapshot(List<ExternalFile> list, boolean z) {
        this.files = list;
        this.indexingOp = z;
    }

    public AsterixInputStream createInputStream(IHyracksTaskContext iHyracksTaskContext, int i, IExternalIndexer iExternalIndexer) throws HyracksDataException {
        try {
            if (!this.configured) {
                this.conf = this.confFactory.getConf();
                this.inputSplits = this.inputSplitsFactory.getSplits();
                this.nodeName = iHyracksTaskContext.getJobletContext().getApplicationContext().getNodeId();
                this.configured = true;
            }
            return new HDFSInputStream(this.read, this.inputSplits, this.readSchedule, this.nodeName, this.conf, this.configuration, this.files, iExternalIndexer);
        } catch (Exception e) {
            throw new HyracksDataException(e);
        }
    }

    @Override // org.apache.asterix.external.api.IExternalDataSourceFactory
    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
        this.clusterLocations = HDFSUtils.getPartitionConstraints(this.clusterLocations);
        return this.clusterLocations;
    }

    private static void init() throws HyracksDataException {
        if (initialized.booleanValue()) {
            return;
        }
        synchronized (initLock) {
            if (!initialized.booleanValue()) {
                hdfsScheduler = HDFSUtils.initializeHDFSScheduler();
                indexingScheduler = HDFSUtils.initializeIndexingHDFSScheduler();
                initialized = true;
            }
        }
    }

    public JobConf getJobConf() throws HyracksDataException {
        return this.confFactory.getConf();
    }

    @Override // org.apache.asterix.external.api.IRecordReaderFactory, org.apache.asterix.external.api.IExternalDataSourceFactory
    public IExternalDataSourceFactory.DataSourceType getDataSourceType() {
        return ExternalDataUtils.getDataSourceType(this.configuration);
    }

    @Override // org.apache.asterix.external.api.IRecordReaderFactory
    public IRecordReader<? extends Object> createRecordReader(IHyracksTaskContext iHyracksTaskContext, int i) throws HyracksDataException {
        try {
            IExternalIndexer indexer = this.files == null ? null : ExternalIndexerProvider.getIndexer(this.configuration);
            if (this.format != null) {
                StreamRecordReader createRecordReader = StreamRecordReaderProvider.createRecordReader(this.format, createInputStream(iHyracksTaskContext, i, indexer), this.configuration);
                return indexer != null ? new IndexingStreamRecordReader(createRecordReader, indexer) : createRecordReader;
            }
            return new HDFSRecordReader(this.read, this.inputSplitsFactory.getSplits(), this.readSchedule, iHyracksTaskContext.getJobletContext().getApplicationContext().getNodeId(), this.confFactory.getConf(), this.files, indexer);
        } catch (Exception e) {
            throw new HyracksDataException(e);
        }
    }

    @Override // org.apache.asterix.external.api.IRecordReaderFactory
    public Class<?> getRecordClass() {
        return this.recordClass;
    }

    @Override // org.apache.asterix.external.api.IExternalDataSourceFactory
    public boolean isIndexible() {
        return true;
    }

    @Override // org.apache.asterix.external.api.IIndexibleExternalDataSource
    public boolean isIndexingOp() {
        return this.files != null && this.indexingOp;
    }
}
