/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.CloudIdAndTimestampResponse;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.common.response.CloudTagsResponse;
import eu.europeana.cloud.common.response.ResultSlice;
import eu.europeana.cloud.mcs.driver.DataSetServiceClient;
import eu.europeana.cloud.mcs.driver.FileServiceClient;
import eu.europeana.cloud.mcs.driver.RecordServiceClient;
import eu.europeana.cloud.mcs.driver.RepresentationIterator;
import eu.europeana.cloud.mcs.driver.exception.DriverException;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CollectorWrapper;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CustomKafkaSpout;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFiller;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFillerForLatestRevisionJob;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFillerForSpecificRevisionJob;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.TaskQueueFiller;
import eu.europeana.cloud.service.mcs.exception.MCSException;
import java.net.MalformedURLException;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MCSReaderSpout
extends CustomKafkaSpout {
    private SpoutOutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(MCSReaderSpout.class);
    private static final int DEFAULT_RETRIES = 3;
    private static final int SLEEP_TIME = 5000;
    TaskDownloader taskDownloader;
    private String mcsClientURL;

    public MCSReaderSpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password, String mcsClientURL) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
        this.mcsClientURL = mcsClientURL;
    }

    MCSReaderSpout(SpoutConfig spoutConf) {
        super(spoutConf);
        this.taskDownloader = new TaskDownloader();
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.taskDownloader = new TaskDownloader();
        super.open(conf, context, new CollectorWrapper((ISpoutOutputCollector)collector, this.taskDownloader));
    }

    @Override
    public void nextTuple() {
        block3: {
            StormTaskTuple stormTaskTuple = null;
            try {
                super.nextTuple();
                stormTaskTuple = this.taskDownloader.getTupleWithFileURL();
                if (stormTaskTuple != null) {
                    this.collector.emit((List)stormTaskTuple.toStormTuple());
                }
            }
            catch (Exception e) {
                LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                if (stormTaskTuple == null) break block3;
                this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    @Override
    public void deactivate() {
        LOGGER.info("Deactivate method was executed");
        this.deactivateWaitingTasks();
        this.deactivateCurrentTask();
        LOGGER.info("Deactivate method was finished");
    }

    private void deactivateWaitingTasks() {
        DpsTask dpsTask;
        while ((dpsTask = this.taskDownloader.taskQueue.poll()) != null) {
            this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because of redeployment", TaskState.DROPPED.toString());
        }
    }

    private void deactivateCurrentTask() {
        DpsTask currentDpsTask = this.taskDownloader.getCurrentDpsTask();
        if (currentDpsTask != null) {
            this.cassandraTaskInfoDAO.dropTask(currentDpsTask.getTaskId(), "The task was dropped because of redeployment", TaskState.DROPPED.toString());
        }
    }

    final class TaskDownloader
    extends Thread
    implements TaskQueueFiller {
        private static final int MAX_SIZE = 100;
        private static final int INTERNAL_THREADS_NUMBER = 10;
        public static final int MAX_BATCH_SIZE = 100;
        ArrayBlockingQueue<DpsTask> taskQueue = new ArrayBlockingQueue(100);
        ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls = new ArrayBlockingQueue(1000);
        private DpsTask currentDpsTask;

        public TaskDownloader() {
            this.start();
        }

        public StormTaskTuple getTupleWithFileURL() {
            return this.tuplesWithFileUrls.poll();
        }

        @Override
        public void addNewTask(DpsTask dpsTask) {
            try {
                this.taskQueue.put(dpsTask);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void run() {
            StormTaskTuple stormTaskTuple = null;
            while (true) {
                try {
                    block3: while (true) {
                        this.currentDpsTask = this.taskQueue.take();
                        if (!CustomKafkaSpout.taskStatusChecker.hasKillFlag(this.currentDpsTask.getTaskId())) {
                            String stream;
                            this.startProgressing(this.currentDpsTask);
                            OAIPMHHarvestingDetails oaipmhHarvestingDetails = this.currentDpsTask.getHarvestingDetails();
                            if (oaipmhHarvestingDetails == null) {
                                oaipmhHarvestingDetails = new OAIPMHHarvestingDetails();
                            }
                            if ((stream = this.getStream(this.currentDpsTask)).equals(InputDataType.FILE_URLS.name())) {
                                stormTaskTuple = new StormTaskTuple(this.currentDpsTask.getTaskId(), this.currentDpsTask.getTaskName(), null, null, this.currentDpsTask.getParameters(), this.currentDpsTask.getOutputRevision(), oaipmhHarvestingDetails);
                                List<String> files2 = this.currentDpsTask.getDataEntry(InputDataType.valueOf(stream));
                                Iterator<String> iterator = files2.iterator();
                                while (true) {
                                    if (!iterator.hasNext()) continue block3;
                                    String file = iterator.next();
                                    StormTaskTuple fileTuple = new Cloner().deepClone(stormTaskTuple);
                                    fileTuple.addParameter("DPS_TASK_INPUT_DATA", file);
                                    this.tuplesWithFileUrls.put(fileTuple);
                                }
                            }
                            this.execute(stream, this.currentDpsTask);
                            continue;
                        }
                        LOGGER.info("Skipping DROPPED task {}", (Object)this.currentDpsTask.getTaskId());
                    }
                }
                catch (Exception e) {
                    LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                    if (stormTaskTuple == null) continue;
                    MCSReaderSpout.this.cassandraTaskInfoDAO.dropTask(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
                    continue;
                }
                break;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void execute(String stream, DpsTask dpsTask) throws Exception {
            List<String> dataSets = dpsTask.getDataEntry(InputDataType.valueOf(stream));
            String representationName = dpsTask.getParameter("REPRESENTATION_NAME");
            dpsTask.getParameters().remove("REPRESENTATION_NAME");
            String revisionName = dpsTask.getParameter("REVISION_NAME");
            dpsTask.getParameters().remove("REVISION_NAME");
            String revisionProvider = dpsTask.getParameter("REVISION_PROVIDER");
            dpsTask.getParameters().remove("REVISION_PROVIDER");
            String authorizationHeader = dpsTask.getParameter("AUTHORIZATION_HEADER");
            DataSetServiceClient dataSetServiceClient = new DataSetServiceClient(MCSReaderSpout.this.mcsClientURL);
            dataSetServiceClient.useAuthorizationHeader(authorizationHeader);
            RecordServiceClient recordServiceClient = new RecordServiceClient(MCSReaderSpout.this.mcsClientURL);
            recordServiceClient.useAuthorizationHeader(authorizationHeader);
            FileServiceClient fileClient = new FileServiceClient(MCSReaderSpout.this.mcsClientURL);
            fileClient.useAuthorizationHeader(authorizationHeader);
            StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), null, null, dpsTask.getParameters(), dpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
            int expectedSize = 0;
            try {
                for (String dataSetUrl : dataSets) {
                    try {
                        UrlParser urlParser = new UrlParser(dataSetUrl);
                        if (urlParser.isUrlToDataset()) {
                            if (revisionName != null && revisionProvider != null) {
                                String revisionTimestamp = dpsTask.getParameter("REVISION_TIMESTAMP");
                                if (revisionTimestamp != null) {
                                    if (stormTaskTuple.getParameter("SAMPLE_SIZE") == null) {
                                        expectedSize += this.handleExactRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, revisionTimestamp, urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                                        continue;
                                    }
                                    expectedSize += this.handlePartialSizeExactRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, revisionTimestamp, urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS), Integer.parseInt(stormTaskTuple.getParameter("SAMPLE_SIZE")));
                                    continue;
                                }
                                if (stormTaskTuple.getParameter("SAMPLE_SIZE") == null) {
                                    expectedSize += this.handleLatestRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS));
                                    continue;
                                }
                                expectedSize += this.handlePartialSizeForLatestRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS), Integer.parseInt(stormTaskTuple.getParameter("SAMPLE_SIZE")));
                                continue;
                            }
                            QueueFiller queueFiller = new QueueFiller(CustomKafkaSpout.taskStatusChecker, MCSReaderSpout.this.collector, this.tuplesWithFileUrls);
                            RepresentationIterator iterator = dataSetServiceClient.getRepresentationIterator(urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                            while (iterator.hasNext() && !CustomKafkaSpout.taskStatusChecker.hasKillFlag(dpsTask.getTaskId())) {
                                expectedSize += queueFiller.addTupleToQueue(stormTaskTuple, fileClient, iterator.next());
                            }
                            continue;
                        }
                        LOGGER.warn("dataSet url is not formulated correctly {}", (Object)dataSetUrl);
                        this.emitErrorNotification(dpsTask.getTaskId(), dataSetUrl, "dataSet url is not formulated correctly", "");
                    }
                    catch (MalformedURLException ex) {
                        LOGGER.error("MCSReaderSpout error, Error while parsing DataSet URL : {}", (Object)ex.getMessage());
                        this.emitErrorNotification(dpsTask.getTaskId(), dataSetUrl, ex.getMessage(), dpsTask.getParameters().toString());
                    }
                }
                MCSReaderSpout.this.cassandraTaskInfoDAO.setUpdateExpectedSize(dpsTask.getTaskId(), expectedSize);
            }
            finally {
                fileClient.close();
                dataSetServiceClient.close();
                recordServiceClient.close();
                if (expectedSize == 0) {
                    MCSReaderSpout.this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because it is empty", TaskState.DROPPED.toString());
                }
            }
        }

        DpsTask getCurrentDpsTask() {
            return this.currentDpsTask;
        }

        private void startProgressing(DpsTask dpsTask) {
            LOGGER.info("Start progressing for Task with id {}", (Object)dpsTask.getTaskId());
            MCSReaderSpout.this.cassandraTaskInfoDAO.updateTask(dpsTask.getTaskId(), "", String.valueOf((Object)TaskState.CURRENTLY_PROCESSING), new Date());
        }

        private ResultSlice<CloudIdAndTimestampResponse> getLatestDataSetCloudIdByRepresentationAndRevisionChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider, String startFrom) throws MCSException, DriverException {
            int retries = 3;
            while (true) {
                try {
                    ResultSlice<CloudIdAndTimestampResponse> resultSlice = dataSetServiceClient.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(datasetName, datasetProvider, revisionProvider, revisionName, representationName, false, startFrom);
                    if (resultSlice == null || resultSlice.getResults() == null) {
                        throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                    }
                    return resultSlice;
                }
                catch (Exception e) {
                    if (retries-- > 0) {
                        LOGGER.warn("Error while getting slice of  latest cloud Id from data set.Retries Left{} ", (Object)retries);
                        this.waitForSpecificTime(5000);
                        continue;
                    }
                    LOGGER.error("Error while getting slice of latest cloud Id from data set.");
                    throw e;
                }
                break;
            }
        }

        private int handleLatestRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
            ResultSlice<CloudIdAndTimestampResponse> resultSlice;
            int count = 0;
            String startFrom = null;
            long taskId = stormTaskTuple.getTaskId();
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
            do {
                resultSlice = this.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, datasetName, datasetProvider, startFrom);
                List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList = resultSlice.getResults();
                Future<Integer> job = executorService.submit(new QueueFillerForLatestRevisionJob(fileServiceClient, recordServiceClient, MCSReaderSpout.this.collector, CustomKafkaSpout.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, cloudIdAndTimestampResponseList));
                futures.add(job);
                if (futures.size() != 10) continue;
                count += this.getCountAndWait(futures);
            } while ((startFrom = resultSlice.getNextSlice()) != null && !CustomKafkaSpout.taskStatusChecker.hasKillFlag(taskId));
            if (futures.size() > 0) {
                count += this.getCountAndWait(futures);
            }
            executorService.shutdown();
            return count;
        }

        private int handlePartialSizeForLatestRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider, int maxRecordsCount) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
            ResultSlice<CloudIdAndTimestampResponse> resultSlice;
            int count = 0;
            String startFrom = null;
            long taskId = stormTaskTuple.getTaskId();
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
            int total = 0;
            do {
                List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList;
                if ((total += (cloudIdAndTimestampResponseList = (resultSlice = this.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, datasetName, datasetProvider, startFrom)).getResults()).size()) > maxRecordsCount) {
                    if (total - maxRecordsCount >= 100) break;
                    cloudIdAndTimestampResponseList = cloudIdAndTimestampResponseList.subList(0, maxRecordsCount % 100);
                }
                Future<Integer> job = executorService.submit(new QueueFillerForLatestRevisionJob(fileServiceClient, recordServiceClient, MCSReaderSpout.this.collector, CustomKafkaSpout.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, cloudIdAndTimestampResponseList));
                futures.add(job);
                if (futures.size() != 10) continue;
                count += this.getCountAndWait(futures);
            } while ((startFrom = resultSlice.getNextSlice()) != null && !CustomKafkaSpout.taskStatusChecker.hasKillFlag(taskId));
            if (futures.size() > 0) {
                count += this.getCountAndWait(futures);
            }
            executorService.shutdown();
            return count;
        }

        private int handleExactRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
            ResultSlice<CloudTagsResponse> resultSlice;
            int count = 0;
            String startFrom = null;
            long taskId = stormTaskTuple.getTaskId();
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
            do {
                resultSlice = this.getDataSetRevisionsChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, datasetProvider, datasetName, startFrom);
                List<CloudTagsResponse> cloudTagsResponses = resultSlice.getResults();
                Future<Integer> job = executorService.submit(new QueueFillerForSpecificRevisionJob(fileClient, recordServiceClient, MCSReaderSpout.this.collector, CustomKafkaSpout.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, revisionTimestamp, cloudTagsResponses));
                futures.add(job);
                if (futures.size() != 10) continue;
                count += this.getCountAndWait(futures);
            } while ((startFrom = resultSlice.getNextSlice()) != null && !CustomKafkaSpout.taskStatusChecker.hasKillFlag(taskId));
            if (futures.size() > 0) {
                count += this.getCountAndWait(futures);
            }
            executorService.shutdown();
            return count;
        }

        private int handlePartialSizeExactRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName, int maxRecordsCount) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
            ResultSlice<CloudTagsResponse> resultSlice;
            int count = 0;
            String startFrom = null;
            long taskId = stormTaskTuple.getTaskId();
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
            int total = 0;
            do {
                List<CloudTagsResponse> cloudTagsResponses;
                if ((total += (cloudTagsResponses = (resultSlice = this.getDataSetRevisionsChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, datasetProvider, datasetName, startFrom)).getResults()).size()) > maxRecordsCount) {
                    if (total - maxRecordsCount >= 100) break;
                    cloudTagsResponses = cloudTagsResponses.subList(0, maxRecordsCount % 100);
                }
                Future<Integer> job = executorService.submit(new QueueFillerForSpecificRevisionJob(fileClient, recordServiceClient, MCSReaderSpout.this.collector, CustomKafkaSpout.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, revisionTimestamp, cloudTagsResponses));
                futures.add(job);
                if (futures.size() != 10) continue;
                count += this.getCountAndWait(futures);
            } while ((startFrom = resultSlice.getNextSlice()) != null && !CustomKafkaSpout.taskStatusChecker.hasKillFlag(taskId));
            if (futures.size() > 0) {
                count += this.getCountAndWait(futures);
            }
            executorService.shutdown();
            return count;
        }

        private int getCountAndWait(Set<Future<Integer>> futures) throws InterruptedException, ExecutionException {
            int count = 0;
            for (Future<Integer> future : futures) {
                count += future.get().intValue();
            }
            futures.clear();
            return count;
        }

        private ResultSlice<CloudTagsResponse> getDataSetRevisionsChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName, String startFrom) throws MCSException, DriverException {
            int retries = 3;
            while (true) {
                try {
                    ResultSlice<CloudTagsResponse> resultSlice = dataSetServiceClient.getDataSetRevisionsChunk(datasetProvider, datasetName, representationName, revisionName, revisionProvider, revisionTimestamp, startFrom, null);
                    if (resultSlice == null || resultSlice.getResults() == null) {
                        throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                    }
                    return resultSlice;
                }
                catch (Exception e) {
                    if (retries-- > 0) {
                        LOGGER.warn("Error while getting Revisions from data set.Retries Left{} ", (Object)retries);
                        this.waitForSpecificTime(5000);
                        continue;
                    }
                    LOGGER.error("Error while getting Revisions from data set.");
                    throw e;
                }
                break;
            }
        }

        private void waitForSpecificTime(int milliSecond) {
            try {
                Thread.sleep(milliSecond);
            }
            catch (InterruptedException e1) {
                Thread.currentThread().interrupt();
                LOGGER.error(e1.getMessage());
            }
        }

        private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
            NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
            MCSReaderSpout.this.collector.emit("NotificationStream", (List)nt.toStormTuple());
        }

        private String getStream(DpsTask task) {
            if (task.getInputData().get((Object)InputDataType.FILE_URLS) != null) {
                return InputDataType.FILE_URLS.name();
            }
            return InputDataType.DATASET_URLS.name();
        }
    }
}

