package org.streampipes.connect.adapter.generic.protocol.stream;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.util.Shell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.streampipes.connect.SendToPipeline;
import org.streampipes.connect.adapter.generic.format.Format;
import org.streampipes.connect.adapter.generic.format.Parser;
import org.streampipes.connect.adapter.generic.guess.SchemaGuesser;
import org.streampipes.connect.adapter.generic.pipeline.AdapterPipeline;
import org.streampipes.connect.adapter.generic.protocol.Protocol;
import org.streampipes.connect.adapter.generic.sdk.ParameterExtractor;
import org.streampipes.connect.exception.ParseException;
import org.streampipes.model.AdapterType;
import org.streampipes.model.connect.grounding.ProtocolDescription;
import org.streampipes.model.connect.guess.GuessSchema;
import org.streampipes.sdk.builder.adapter.ProtocolDescriptionBuilder;
import org.streampipes.sdk.helpers.AdapterSourceType;
import org.streampipes.sdk.helpers.Labels;

/* loaded from: input_file:org/streampipes/connect/adapter/generic/protocol/stream/HDFSProtocol.class */
public class HDFSProtocol extends Protocol {
    public static final String ID = "https://streampipes.org/vocabulary/v1/protocol/stream/HDFS";
    private static String INTERVAL_PROPERTY = "intervalProperty";
    private static String URL_PROPERTY = "urlProperty";
    private static String USER_PROPERTY = "userProperty";
    private static String PASSWORD_PROPERTY = "passwordProperty";
    private static String DATA_PATH_PROPERTY = "dataPathProperty";
    private static String RECURSIVELY_PROPERTY = "recursively";
    private static String OPTIONS = "optionsFile";
    private long intervalProperty;
    private String dataPathProperty;
    private String urlProperty;
    private String userProperty;
    private String passwordProperty;
    private boolean recursively;
    private ScheduledExecutorService scheduler;
    private Logger logger;
    private long knownNewestFileDate;

    public HDFSProtocol() {
        this.logger = LoggerFactory.getLogger((Class<?>) HDFSProtocol.class);
    }

    public HDFSProtocol(Parser parser, Format format, long j, String str, String str2, boolean z) {
        super(parser, format);
        this.logger = LoggerFactory.getLogger((Class<?>) HDFSProtocol.class);
        this.intervalProperty = j;
        this.dataPathProperty = str;
        this.urlProperty = str2;
        this.recursively = z;
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public Protocol getInstance(ProtocolDescription protocolDescription, Parser parser, Format format) {
        ParameterExtractor parameterExtractor = new ParameterExtractor(protocolDescription.getConfig());
        return new HDFSProtocol(parser, format, Long.parseLong(parameterExtractor.singleValue(INTERVAL_PROPERTY)), parameterExtractor.singleValue(DATA_PATH_PROPERTY), parameterExtractor.singleValue(URL_PROPERTY), this.recursively);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public ProtocolDescription declareModel() {
        return (ProtocolDescription) ((ProtocolDescriptionBuilder) ProtocolDescriptionBuilder.create(ID, "HDFS", "Reads messages from the Hadoop Distributed File System").sourceType(AdapterSourceType.STREAM).iconUrl("hdfs.png")).category(AdapterType.Generic).requiredTextParameter(Labels.from(URL_PROPERTY, "HDFS-Server", "Example: hdfs://server:8020")).requiredIntegerParameter(Labels.from(INTERVAL_PROPERTY, "Interval", "Polling interval in seconds")).requiredTextParameter(Labels.from(DATA_PATH_PROPERTY, "Data Path", "The Data Path to watch")).build();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public GuessSchema getGuessSchema() throws ParseException {
        List<byte[]> parseNEvents = this.parser.parseNEvents(getInputStreamFromFile(getFiles().get(0)), 2);
        if (parseNEvents.size() < 2) {
            this.logger.error("Error in HDFS Protocol! Required: 2 elements but the resource just had: " + parseNEvents.size());
            parseNEvents.addAll(parseNEvents);
        }
        return SchemaGuesser.guessSchma(this.parser.getEventSchema(parseNEvents), getNElements(2));
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public List<Map<String, Object>> getNElements(int i) throws ParseException {
        ArrayList arrayList = new ArrayList();
        List<byte[]> parseNEvents = this.parser.parseNEvents(getInputStreamFromFile(getFiles().get(0)), i);
        if (parseNEvents.size() < i) {
            this.logger.error("Error in  HDFS Protocol! User required: " + i + " elements but the resource just had: " + parseNEvents.size());
        }
        Iterator<byte[]> it = parseNEvents.iterator();
        while (it.hasNext()) {
            arrayList.add(this.format.parse(it.next()));
        }
        return arrayList;
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void run(AdapterPipeline adapterPipeline) {
        this.logger.info("Start HDFS Adapter");
        this.knownNewestFileDate = 0L;
        Runnable runnable = () -> {
            executeProtocolLogic(adapterPipeline);
        };
        this.scheduler = Executors.newScheduledThreadPool(1);
        this.scheduler.schedule(runnable, 0L, TimeUnit.MILLISECONDS);
    }

    private void executeProtocolLogic(AdapterPipeline adapterPipeline) {
        Runnable runnable = () -> {
            SendToPipeline sendToPipeline = new SendToPipeline(this.format, adapterPipeline);
            List<LocatedFileStatus> files = getFiles(this.knownNewestFileDate);
            if (files.size() > 0) {
                this.knownNewestFileDate = files.get(files.size() - 1).getModificationTime();
                this.logger.info("+++ New files found, newest file Date: " + this.knownNewestFileDate + " (in milliseconds form 1970)");
            } else {
                this.logger.info("No new files found");
            }
            try {
                files.forEach(locatedFileStatus -> {
                    this.parser.parse(getInputStreamFromFile(locatedFileStatus), sendToPipeline);
                });
            } catch (ParseException e) {
                this.logger.error("Error while parsing: " + e.getMessage());
            }
        };
        this.scheduler = Executors.newScheduledThreadPool(1);
        try {
            this.scheduler.scheduleAtFixedRate(runnable, 1L, this.intervalProperty, TimeUnit.SECONDS).get();
        } catch (InterruptedException e) {
            this.logger.error("Error", (Throwable) e);
        } catch (ExecutionException e2) {
            this.logger.error("Error", (Throwable) e2);
        }
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public void stop() {
        this.scheduler.shutdownNow();
    }

    @Override // org.streampipes.connect.adapter.generic.protocol.Protocol
    public String getId() {
        return ID;
    }

    private List<LocatedFileStatus> getFiles(long j) {
        return (List) getFiles().parallelStream().filter(locatedFileStatus -> {
            return locatedFileStatus.getModificationTime() > j;
        }).sorted((locatedFileStatus2, locatedFileStatus3) -> {
            return Long.valueOf(locatedFileStatus3.getModificationTime()).compareTo(Long.valueOf(locatedFileStatus2.getModificationTime()));
        }).collect(Collectors.toList());
    }

    public List<LocatedFileStatus> getFiles() {
        ArrayList arrayList = new ArrayList();
        FileSystem filesSystem = getFilesSystem();
        try {
            try {
                RemoteIterator<LocatedFileStatus> listFiles = filesSystem.listFiles(new Path(this.dataPathProperty), this.recursively);
                while (listFiles.hasNext()) {
                    arrayList.add(listFiles.next());
                }
                try {
                    filesSystem.close();
                } catch (IOException e) {
                    this.logger.error(e.toString());
                }
            } catch (Throwable th) {
                try {
                    filesSystem.close();
                } catch (IOException e2) {
                    this.logger.error(e2.toString());
                }
                throw th;
            }
        } catch (IOException e3) {
            this.logger.error(e3.toString());
            try {
                filesSystem.close();
            } catch (IOException e4) {
                this.logger.error(e4.toString());
            }
        }
        return arrayList;
    }

    public static List<String> getFileNames(FileSystem fileSystem, String str) throws IOException {
        ArrayList arrayList = new ArrayList();
        if (str.endsWith("/")) {
            RemoteIterator<LocatedFileStatus> listFiles = fileSystem.listFiles(new Path(str), false);
            while (listFiles.hasNext()) {
                String path = listFiles.next().getPath().toString();
                if (path.endsWith("/")) {
                    arrayList.addAll(getFileNames(fileSystem, path));
                } else {
                    arrayList.add(path);
                }
            }
        } else {
            arrayList.add(str);
        }
        return arrayList;
    }

    private FileSystem getFilesSystem() {
        FileSystem fileSystem = null;
        try {
            fileSystem = FileSystem.get(URI.create(this.urlProperty), getConfigutation());
        } catch (IOException e) {
            this.logger.error(e.toString());
        }
        return fileSystem;
    }

    private Configuration getConfigutation() {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", this.urlProperty);
        configuration.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
        configuration.set(CommonConfigurationKeysPublic.FS_FILE_IMPL_KEY, LocalFileSystem.class.getName());
        configuration.set(HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, "true");
        System.setProperty("HADOOP_USER_NAME", HdfsConstants.HDFS_URI_SCHEME);
        System.setProperty(Shell.SYSPROP_HADOOP_HOME_DIR, "/");
        return configuration;
    }

    private FSDataInputStream getInputStreamFromFile(LocatedFileStatus locatedFileStatus) throws ParseException {
        FSDataInputStream fSDataInputStream = null;
        try {
            fSDataInputStream = getFilesSystem().open(locatedFileStatus.getPath());
        } catch (IOException e) {
            if (fSDataInputStream == null) {
                throw new ParseException(e.getMessage());
            }
        }
        return fSDataInputStream;
    }
}
