/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.http.spout;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.http.common.CompressionFileExtension;
import eu.europeana.cloud.http.common.UnpackingServiceFactory;
import eu.europeana.cloud.http.exceptions.CompressionExtensionNotRecognizedException;
import eu.europeana.cloud.http.service.FileUnpackingService;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CustomKafkaSpout;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.utils.TaskSpoutInfo;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLConnection;
import java.nio.file.FileVisitResult;
import java.nio.file.FileVisitor;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.FileAttribute;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpKafkaSpout
extends CustomKafkaSpout {
    private SpoutOutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpKafkaSpout.class);
    private static final int BATCH_MAX_SIZE = 4960;
    private static final String CLOUD_SEPARATOR = "_";
    private static final String MAC_TEMP_FOLDER = "__MACOSX";
    private static final String MAC_TEMP_FILE = ".DS_Store";
    private transient ConcurrentHashMap<Long, TaskSpoutInfo> cache;

    HttpKafkaSpout(SpoutConfig spoutConf) {
        super(spoutConf);
    }

    public HttpKafkaSpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.cache = new ConcurrentHashMap(50);
        super.open(conf, context, new CollectorWrapper((ISpoutOutputCollector)collector));
    }

    @Override
    public void nextTuple() {
        block3: {
            DpsTask dpsTask = null;
            try {
                super.nextTuple();
                Iterator iterator2 = ((ConcurrentHashMap.KeySetView)this.cache.keySet()).iterator();
                while (iterator2.hasNext()) {
                    long taskId = (Long)iterator2.next();
                    TaskSpoutInfo currentTask = this.cache.get(taskId);
                    if (currentTask.isStarted()) continue;
                    LOGGER.info("Start progressing for Task with id {}", (Object)currentTask.getDpsTask().getTaskId());
                    this.startProgress(currentTask);
                    dpsTask = currentTask.getDpsTask();
                    StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), dpsTask.getDataEntry(InputDataType.REPOSITORY_URLS).get(0), null, dpsTask.getParameters(), dpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
                    this.execute(stormTaskTuple);
                    this.cache.remove(taskId);
                }
            }
            catch (Exception e) {
                LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                if (dpsTask == null) break block3;
                this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
            }
        }
    }

    private void startProgress(TaskSpoutInfo taskInfo) {
        taskInfo.startTheTask();
        DpsTask task = taskInfo.getDpsTask();
        this.cassandraTaskInfoDAO.updateTask(task.getTaskId(), "", String.valueOf((Object)TaskState.CURRENTLY_PROCESSING), new Date());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
        this.collector.emit("NotificationStream", (List)nt.toStormTuple());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(StormTaskTuple stormTaskTuple) throws CompressionExtensionNotRecognizedException {
        File file = null;
        try {
            String httpURL = stormTaskTuple.getFileUrl();
            file = this.downloadFile(httpURL);
            String compressingExtension = FilenameUtils.getExtension(file.getName());
            FileUnpackingService fileUnpackingService = UnpackingServiceFactory.createUnpackingService(compressingExtension);
            fileUnpackingService.unpackFile(file.getAbsolutePath(), file.getParent() + File.separator);
            Path start = Paths.get(new File(file.getParent()).toURI());
            this.emitFiles(start, stormTaskTuple);
            this.cassandraTaskInfoDAO.setUpdateExpectedSize(stormTaskTuple.getTaskId(), this.cache.get(stormTaskTuple.getTaskId()).getFileCount());
            this.removeTempFolder(file);
        }
        catch (IOException e) {
            try {
                LOGGER.error("HTTPHarvesterBolt error: {} ", (Object)e.getMessage());
                this.emitErrorNotification(stormTaskTuple.getTaskId(), stormTaskTuple.getFileUrl(), "Error while reading the files because of " + e.getMessage(), "");
                this.removeTempFolder(file);
            }
            catch (Throwable throwable) {
                this.removeTempFolder(file);
                throw throwable;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private File downloadFile(String httpURL) throws IOException {
        URL url = new URL(httpURL);
        URLConnection conn = url.openConnection();
        InputStream inputStream = conn.getInputStream();
        OutputStream outputStream = null;
        try {
            String tempFileName = UUID.randomUUID().toString();
            String folderPath = Files.createTempDirectory(tempFileName, new FileAttribute[0]) + File.separator;
            File file = new File(folderPath + FilenameUtils.getName(httpURL));
            outputStream = new FileOutputStream(file.toPath().toString());
            byte[] buffer = new byte[4960];
            IOUtils.copyLarge(inputStream, outputStream, buffer);
            File file2 = file;
            return file2;
        }
        finally {
            if (outputStream != null) {
                outputStream.close();
            }
            if (inputStream != null) {
                inputStream.close();
            }
        }
    }

    private void emitFiles(final Path start, final StormTaskTuple stormTaskTuple) throws IOException {
        Files.walkFileTree(start, (FileVisitor<? super Path>)new SimpleFileVisitor<Path>(){

            @Override
            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
                if (taskStatusChecker.hasKillFlag(stormTaskTuple.getTaskId())) {
                    return FileVisitResult.TERMINATE;
                }
                String fileName = HttpKafkaSpout.this.getFileNameFromPath(file);
                if (fileName.equals(HttpKafkaSpout.MAC_TEMP_FILE)) {
                    return FileVisitResult.CONTINUE;
                }
                String extension2 = FilenameUtils.getExtension(file.toString());
                if (!CompressionFileExtension.contains(extension2)) {
                    String mimeType = Files.probeContentType(file);
                    String filePath = file.toString();
                    String readableFileName = filePath.substring(start.toString().length() + 1).replaceAll("\\\\", "/");
                    HttpKafkaSpout.this.emitFileContent(stormTaskTuple, filePath, readableFileName, mimeType);
                    ((TaskSpoutInfo)HttpKafkaSpout.this.cache.get(stormTaskTuple.getTaskId())).inc();
                }
                return FileVisitResult.CONTINUE;
            }

            @Override
            public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) {
                String dirName = HttpKafkaSpout.this.getFileNameFromPath(dir);
                if (dirName.equals(HttpKafkaSpout.MAC_TEMP_FOLDER)) {
                    return FileVisitResult.SKIP_SUBTREE;
                }
                return FileVisitResult.CONTINUE;
            }
        });
    }

    private String getFileNameFromPath(Path path) {
        if (path != null) {
            return path.getFileName().toString();
        }
        throw new IllegalArgumentException("Path parameter should never be null");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitFileContent(StormTaskTuple stormTaskTuple, String filePath, String readableFilePath, String mimeType) throws IOException {
        try (FileInputStream fileInputStream = null;){
            StormTaskTuple tuple = new Cloner().deepClone(stormTaskTuple);
            File file = new File(filePath);
            fileInputStream = new FileInputStream(file);
            tuple.setFileData(fileInputStream);
            tuple.addParameter("OUTPUT_MIME_TYPE", mimeType);
            String localId = this.formulateLocalId(readableFilePath);
            tuple.addParameter("CLOUD_LOCAL_IDENTIFIER", localId);
            tuple.setFileUrl(readableFilePath);
            this.collector.emit((List)tuple.toStormTuple());
        }
    }

    private String formulateLocalId(String readableFilePath) {
        return readableFilePath + CLOUD_SEPARATOR + UUID.randomUUID().toString();
    }

    private void removeTempFolder(File file) {
        if (file != null) {
            try {
                FileUtils.deleteDirectory(new File(file.getParent()));
            }
            catch (IOException e) {
                LOGGER.error("ERROR while removing the temp Folder: {}", (Object)e.getMessage());
            }
        }
    }

    private class CollectorWrapper
    extends SpoutOutputCollector {
        CollectorWrapper(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            try {
                DpsTask dpsTask = new ObjectMapper().readValue((String)tuple.get(0), DpsTask.class);
                if (dpsTask != null) {
                    long taskId = dpsTask.getTaskId();
                    HttpKafkaSpout.this.cache.putIfAbsent(taskId, new TaskSpoutInfo(dpsTask));
                }
            }
            catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
            return Collections.emptyList();
        }
    }
}

