/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.File;
import eu.europeana.cloud.common.model.Representation;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.mcs.driver.FileServiceClient;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.storm.spout.SpoutOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueFiller {
    private TaskStatusChecker taskStatusChecker;
    private SpoutOutputCollector collector;
    ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls;
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueFiller.class);

    public QueueFiller(TaskStatusChecker taskStatusChecker, SpoutOutputCollector collector, ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls) {
        this.taskStatusChecker = taskStatusChecker;
        this.collector = collector;
        this.tuplesWithFileUrls = tuplesWithFileUrls;
    }

    public int addTupleToQueue(StormTaskTuple stormTaskTuple, FileServiceClient fileServiceClient, Representation representation) {
        int count = 0;
        long taskId = stormTaskTuple.getTaskId();
        if (representation != null) {
            for (File file : representation.getFiles()) {
                String fileUrl = "";
                if (!this.taskStatusChecker.hasKillFlag(taskId)) {
                    try {
                        fileUrl = fileServiceClient.getFileUri(representation.getCloudId(), representation.getRepresentationName(), representation.getVersion(), file.getFileName()).toString();
                        StormTaskTuple fileTuple = this.buildNextStormTuple(stormTaskTuple, fileUrl);
                        this.tuplesWithFileUrls.put(fileTuple);
                        ++count;
                    }
                    catch (Exception e) {
                        LOGGER.warn("Error while getting File URI from MCS {}", (Object)e.getMessage());
                        ++count;
                        this.emitErrorNotification(taskId, fileUrl, "Error while getting File URI from MCS " + e.getMessage(), "");
                    }
                    continue;
                }
                break;
            }
        } else {
            LOGGER.warn("Problem while reading representation");
        }
        return count;
    }

    private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, RecordState.ERROR, message, additionalInformations);
        this.collector.emit("NotificationStream", (List)nt.toStormTuple());
    }

    private StormTaskTuple buildNextStormTuple(StormTaskTuple stormTaskTuple, String fileUrl) {
        StormTaskTuple fileTuple = (StormTaskTuple)new Cloner().deepClone((Object)stormTaskTuple);
        fileTuple.addParameter("DPS_TASK_INPUT_DATA", fileUrl);
        return fileTuple;
    }
}

