/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.http.spout;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.http.common.CompressionFileExtension;
import eu.europeana.cloud.http.common.UnpackingServiceFactory;
import eu.europeana.cloud.http.exceptions.CompressionExtensionNotRecognizedException;
import eu.europeana.cloud.http.service.FileUnpackingService;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CustomKafkaSpout;
import eu.europeana.metis.transformation.service.EuropeanaGeneratedIdsMap;
import eu.europeana.metis.transformation.service.EuropeanaIdCreator;
import eu.europeana.metis.transformation.service.EuropeanaIdException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpKafkaSpout
extends CustomKafkaSpout {
    private SpoutOutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpKafkaSpout.class);
    private static final int BATCH_MAX_SIZE = 4960;
    private static final String CLOUD_SEPARATOR = "_";
    private static final String MAC_TEMP_FOLDER = "__MACOSX";
    private static final String MAC_TEMP_FILE = ".DS_Store";
    TaskDownloader taskDownloader;

    HttpKafkaSpout(SpoutConfig spoutConf) {
        super(spoutConf);
        this.taskDownloader = new TaskDownloader();
    }

    public HttpKafkaSpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.taskDownloader = new TaskDownloader();
        this.collector = collector;
        super.open(conf, context, (SpoutOutputCollector)new CollectorWrapper((ISpoutOutputCollector)collector));
    }

    public void nextTuple() {
        block3: {
            StormTaskTuple stormTaskTuple = null;
            try {
                super.nextTuple();
                stormTaskTuple = this.taskDownloader.getTupleWithFileURL();
                if (stormTaskTuple != null) {
                    this.collector.emit((List)stormTaskTuple.toStormTuple());
                }
            }
            catch (Exception e) {
                LOGGER.error("Spout error: {}", (Object)e.getMessage());
                if (stormTaskTuple == null) break block3;
                this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
            }
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    public void deactivate() {
        LOGGER.info("Deactivate method was executed");
        this.deactivateWaitingTasks();
        this.deactivateCurrentTask();
        LOGGER.info("Deactivate method was finished");
    }

    private void deactivateWaitingTasks() {
        DpsTask dpsTask;
        while ((dpsTask = this.taskDownloader.taskQueue.poll()) != null) {
            this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because of redeployment", TaskState.DROPPED.toString());
        }
    }

    private void deactivateCurrentTask() {
        DpsTask currentDpsTask = this.taskDownloader.getCurrentDpsTask();
        if (currentDpsTask != null) {
            this.cassandraTaskInfoDAO.dropTask(currentDpsTask.getTaskId(), "The task was dropped because of redeployment", TaskState.DROPPED.toString());
        }
    }

    private class CollectorWrapper
    extends SpoutOutputCollector {
        CollectorWrapper(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            try {
                DpsTask dpsTask = (DpsTask)new ObjectMapper().readValue((String)tuple.get(0), DpsTask.class);
                if (dpsTask != null) {
                    HttpKafkaSpout.this.taskDownloader.addNewTask(dpsTask);
                }
            }
            catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
            return Collections.emptyList();
        }
    }

    class TaskDownloader
    extends Thread {
        private static final int MAX_SIZE = 100;
        ArrayBlockingQueue<DpsTask> taskQueue = new ArrayBlockingQueue(100);
        ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls = new ArrayBlockingQueue(100);
        private DpsTask currentDpsTask;

        public TaskDownloader() {
            this.start();
        }

        public StormTaskTuple getTupleWithFileURL() {
            return this.tuplesWithFileUrls.poll();
        }

        public void addNewTask(DpsTask dpsTask) {
            try {
                this.taskQueue.put(dpsTask);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void run() {
            StormTaskTuple stormTaskTuple = null;
            while (true) {
                try {
                    while (true) {
                        this.currentDpsTask = this.taskQueue.take();
                        if (!taskStatusChecker.hasKillFlag(this.currentDpsTask.getTaskId())) {
                            this.startProgress(this.currentDpsTask.getTaskId());
                            stormTaskTuple = new StormTaskTuple(this.currentDpsTask.getTaskId(), this.currentDpsTask.getTaskName(), (String)this.currentDpsTask.getDataEntry(InputDataType.REPOSITORY_URLS).get(0), null, this.currentDpsTask.getParameters(), this.currentDpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
                            this.execute(stormTaskTuple);
                            continue;
                        }
                        LOGGER.info("Skipping DROPPED task {}", (Object)this.currentDpsTask.getTaskId());
                    }
                }
                catch (Exception e) {
                    LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                    if (stormTaskTuple == null) continue;
                    HttpKafkaSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
                    continue;
                }
                break;
            }
        }

        private void startProgress(long taskId) {
            LOGGER.info("Start progressing for Task with id {}", (Object)this.currentDpsTask.getTaskId());
            HttpKafkaSpout.this.cassandraTaskInfoDAO.updateTask(taskId, "", String.valueOf(TaskState.CURRENTLY_PROCESSING), new Date());
        }

        private DpsTask getCurrentDpsTask() {
            return this.currentDpsTask;
        }

        private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
            NotificationTuple nt = NotificationTuple.prepareNotification((long)taskId, (String)resource, (States)States.ERROR, (String)message, (String)additionalInformations);
            HttpKafkaSpout.this.collector.emit("NotificationStream", (List)nt.toStormTuple());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        void execute(StormTaskTuple stormTaskTuple) throws CompressionExtensionNotRecognizedException, IOException, InterruptedException {
            String metisDatasetId;
            boolean useDefaultIdentifiers;
            int expectedSize;
            File file;
            block3: {
                file = null;
                expectedSize = 0;
                try {
                    useDefaultIdentifiers = this.useDefaultIdentifier(stormTaskTuple);
                    metisDatasetId = null;
                    if (useDefaultIdentifiers || !StringUtils.isEmpty((String)(metisDatasetId = stormTaskTuple.getParameter("METIS_DATASET_ID")))) break block3;
                    HttpKafkaSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because METIS_DATASET_ID not provided", TaskState.DROPPED.toString());
                    this.removeTempFolder(file);
                    if (expectedSize != 0) return;
                }
                catch (Throwable throwable) {
                    this.removeTempFolder(file);
                    if (expectedSize != 0) throw throwable;
                    HttpKafkaSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because it is empty", TaskState.DROPPED.toString());
                    throw throwable;
                }
                HttpKafkaSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because it is empty", TaskState.DROPPED.toString());
                return;
            }
            String httpURL = stormTaskTuple.getFileUrl();
            file = this.downloadFile(httpURL);
            String compressingExtension = FilenameUtils.getExtension((String)file.getName());
            FileUnpackingService fileUnpackingService = UnpackingServiceFactory.createUnpackingService(compressingExtension);
            fileUnpackingService.unpackFile(file.getAbsolutePath(), file.getParent() + File.separator);
            Path start = Paths.get(new File(file.getParent()).toURI());
            expectedSize = this.iterateOverFiles(start, stormTaskTuple, useDefaultIdentifiers, metisDatasetId);
            HttpKafkaSpout.this.cassandraTaskInfoDAO.setUpdateExpectedSize(stormTaskTuple.getTaskId(), expectedSize);
            this.removeTempFolder(file);
            if (expectedSize != 0) return;
            HttpKafkaSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because it is empty", TaskState.DROPPED.toString());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private File downloadFile(String httpURL) throws IOException {
            URL url = new URL(httpURL);
            URLConnection conn = url.openConnection();
            InputStream inputStream = conn.getInputStream();
            OutputStream outputStream = null;
            try {
                String tempFileName = UUID.randomUUID().toString();
                String folderPath = Files.createTempDirectory(tempFileName, new FileAttribute[0]) + File.separator;
                File file = new File(folderPath + FilenameUtils.getName((String)httpURL));
                outputStream = new FileOutputStream(file.toPath().toString());
                byte[] buffer = new byte[4960];
                IOUtils.copyLarge((InputStream)inputStream, (OutputStream)outputStream, (byte[])buffer);
                File file2 = file;
                return file2;
            }
            finally {
                if (outputStream != null) {
                    outputStream.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
            }
        }

        private int iterateOverFiles(final Path start, final StormTaskTuple stormTaskTuple, final boolean useDefaultIdentifiers, final String metisDatasetId) throws IOException, InterruptedException {
            final AtomicInteger expectedSize = new AtomicInteger(0);
            Files.walkFileTree(start, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

                @Override
                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                    if (taskStatusChecker.hasKillFlag(stormTaskTuple.getTaskId())) {
                        return FileVisitResult.TERMINATE;
                    }
                    String fileName = TaskDownloader.this.getFileNameFromPath(file);
                    if (fileName.equals(HttpKafkaSpout.MAC_TEMP_FILE)) {
                        return FileVisitResult.CONTINUE;
                    }
                    String extension = FilenameUtils.getExtension((String)file.toString());
                    if (!CompressionFileExtension.contains(extension)) {
                        String mimeType = Files.probeContentType(file);
                        String filePath = file.toString();
                        String readableFileName = filePath.substring(start.toString().length() + 1).replaceAll("\\\\", "/");
                        try {
                            TaskDownloader.this.prepareTuple(stormTaskTuple, filePath, readableFileName, mimeType, useDefaultIdentifiers, metisDatasetId);
                            expectedSize.set(expectedSize.incrementAndGet());
                        }
                        catch (EuropeanaIdException | IOException | InterruptedException e) {
                            TaskDownloader.this.emitErrorNotification(stormTaskTuple.getTaskId(), readableFileName, "Error while reading the file because of " + e.getMessage(), "");
                        }
                    }
                    return FileVisitResult.CONTINUE;
                }

                @Override
                public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
                    String dirName = TaskDownloader.this.getFileNameFromPath(dir);
                    if (dirName.equals(HttpKafkaSpout.MAC_TEMP_FOLDER)) {
                        return FileVisitResult.SKIP_SUBTREE;
                    }
                    return FileVisitResult.CONTINUE;
                }
            });
            return expectedSize.get();
        }

        private String getFileNameFromPath(Path path) {
            if (path != null) {
                return path.getFileName().toString();
            }
            throw new IllegalArgumentException("Path parameter should never be null");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void prepareTuple(StormTaskTuple stormTaskTuple, String filePath, String readableFilePath, String mimeType, boolean useDefaultIdentifiers, String datasetId) throws IOException, InterruptedException, EuropeanaIdException {
            try (FileInputStream fileInputStream = null;){
                String localId;
                StormTaskTuple tuple = (StormTaskTuple)new Cloner().deepClone((Object)stormTaskTuple);
                File file = new File(filePath);
                fileInputStream = new FileInputStream(file);
                tuple.setFileData((InputStream)fileInputStream);
                tuple.addParameter("OUTPUT_MIME_TYPE", mimeType);
                if (useDefaultIdentifiers) {
                    localId = this.formulateLocalId(readableFilePath);
                } else {
                    EuropeanaGeneratedIdsMap europeanaIdentifier = this.getEuropeanaIdentifier(tuple, datasetId);
                    localId = europeanaIdentifier.getEuropeanaGeneratedId();
                    tuple.addParameter("ADDITIONAL_LOCAL_IDENTIFIER", europeanaIdentifier.getSourceProvidedChoAbout());
                }
                tuple.addParameter("CLOUD_LOCAL_IDENTIFIER", localId);
                tuple.setFileUrl(readableFilePath);
                this.tuplesWithFileUrls.put(tuple);
            }
        }

        private boolean useDefaultIdentifier(StormTaskTuple stormTaskTuple) {
            boolean useDefaultIdentifiers = false;
            if ("true".equals(stormTaskTuple.getParameter("USE_DEFAULT_IDENTIFIERS"))) {
                useDefaultIdentifiers = true;
            }
            return useDefaultIdentifiers;
        }

        private EuropeanaGeneratedIdsMap getEuropeanaIdentifier(StormTaskTuple stormTaskTuple, String datasetId) throws EuropeanaIdException {
            String document = new String(stormTaskTuple.getFileData());
            EuropeanaIdCreator europeanIdCreator = new EuropeanaIdCreator();
            return europeanIdCreator.constructEuropeanaId(document, datasetId);
        }

        private String formulateLocalId(String readableFilePath) {
            return readableFilePath + HttpKafkaSpout.CLOUD_SEPARATOR + UUID.randomUUID().toString();
        }

        private void removeTempFolder(File file) {
            if (file != null) {
                try {
                    FileUtils.deleteDirectory((File)new File(file.getParent()));
                }
                catch (IOException e) {
                    LOGGER.error("ERROR while removing the temp Folder: {}", (Object)e.getMessage());
                }
            }
        }
    }
}

