/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spout;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spout.CollectorWrapper;
import eu.europeana.cloud.service.dps.storm.spout.CustomKafkaSpout;
import eu.europeana.cloud.service.dps.storm.spout.TaskQueueFiller;
import eu.europeana.cloud.service.dps.storm.spout.job.TaskExecutor;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class MCSReaderSpout
extends CustomKafkaSpout {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(MCSReaderSpout.class);
    private SpoutOutputCollector collector;
    TaskDownloader taskDownloader;
    private String mcsClientURL;

    public MCSReaderSpout(KafkaSpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password, String mcsClientURL) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
        this.mcsClientURL = mcsClientURL;
    }

    public MCSReaderSpout(KafkaSpoutConfig spoutConf) {
        super(spoutConf);
        this.taskDownloader = new TaskDownloader();
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.taskDownloader = new TaskDownloader();
        super.open(conf, context, new CollectorWrapper((ISpoutOutputCollector)collector, this.taskDownloader));
    }

    @Override
    public void nextTuple() {
        block3: {
            StormTaskTuple stormTaskTuple = null;
            try {
                super.nextTuple();
                stormTaskTuple = this.taskDownloader.getTupleWithFileURL();
                if (stormTaskTuple != null) {
                    this.collector.emit((List)stormTaskTuple.toStormTuple());
                }
            }
            catch (Exception e) {
                LOGGER.error("StaticDpsTaskSpout error: " + e.getMessage(), e);
                if (stormTaskTuple == null) break block3;
                this.taskStatusUpdater.setTaskDropped(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage());
            }
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    @Override
    public void deactivate() {
        LOGGER.info("Deactivate method was executed");
        this.deactivateWaitingTasks();
        this.deactivateCurrentTask();
        LOGGER.info("Deactivate method was finished");
    }

    private void deactivateWaitingTasks() {
        DpsTask dpsTask;
        while ((dpsTask = (DpsTask)this.taskDownloader.taskQueue.poll()) != null) {
            this.taskStatusUpdater.setTaskDropped(dpsTask.getTaskId(), "The task was dropped because of redeployment");
        }
    }

    private void deactivateCurrentTask() {
        DpsTask currentDpsTask = this.taskDownloader.getCurrentDpsTask();
        if (currentDpsTask != null) {
            this.taskStatusUpdater.setTaskDropped(currentDpsTask.getTaskId(), "The task was dropped because of redeployment");
        }
    }

    public TaskDownloader getTaskDownloader() {
        return this.taskDownloader;
    }

    public final class TaskDownloader
    extends Thread
    implements TaskQueueFiller {
        private static final int MAX_SIZE = 100;
        private static final int INTERNAL_THREADS_NUMBER = 10;
        private ArrayBlockingQueue<DpsTask> taskQueue = new ArrayBlockingQueue(100);
        private ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls = new ArrayBlockingQueue(1000);
        private DpsTask currentDpsTask;
        private ExecutorService executorService = Executors.newFixedThreadPool(10);

        public TaskDownloader() {
            this.start();
        }

        public StormTaskTuple getTupleWithFileURL() {
            return this.tuplesWithFileUrls.poll();
        }

        @Override
        public void addNewTask(DpsTask dpsTask) {
            try {
                this.taskQueue.put(dpsTask);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        @Override
        public void run() {
            StormTaskTuple stormTaskTuple = null;
            while (true) {
                try {
                    block3: while (true) {
                        this.currentDpsTask = this.taskQueue.take();
                        if (!CustomKafkaSpout.taskStatusChecker.hasKillFlag(this.currentDpsTask.getTaskId())) {
                            String stream;
                            this.startProgressing(this.currentDpsTask);
                            OAIPMHHarvestingDetails oaipmhHarvestingDetails = this.currentDpsTask.getHarvestingDetails();
                            if (oaipmhHarvestingDetails == null) {
                                oaipmhHarvestingDetails = new OAIPMHHarvestingDetails();
                            }
                            if ((stream = this.getStream(this.currentDpsTask)).equals(InputDataType.FILE_URLS.name())) {
                                stormTaskTuple = new StormTaskTuple(this.currentDpsTask.getTaskId(), this.currentDpsTask.getTaskName(), null, null, this.currentDpsTask.getParameters(), this.currentDpsTask.getOutputRevision(), oaipmhHarvestingDetails);
                                List<String> files = this.currentDpsTask.getDataEntry(InputDataType.valueOf(stream));
                                Iterator<String> iterator = files.iterator();
                                while (true) {
                                    if (!iterator.hasNext()) continue block3;
                                    String file = iterator.next();
                                    StormTaskTuple fileTuple = new Cloner().deepClone(stormTaskTuple);
                                    fileTuple.addParameter("CLOUD_LOCAL_IDENTIFIER", file);
                                    this.tuplesWithFileUrls.put(fileTuple);
                                }
                            }
                            this.executorService.submit(new TaskExecutor(MCSReaderSpout.this.collector, CustomKafkaSpout.taskStatusChecker, MCSReaderSpout.this.taskStatusUpdater, this.tuplesWithFileUrls, MCSReaderSpout.this.mcsClientURL, stream, this.currentDpsTask));
                            continue;
                        }
                        LOGGER.info("Skipping DROPPED task {}", (Object)this.currentDpsTask.getTaskId());
                    }
                }
                catch (Exception e) {
                    LOGGER.error("StaticDpsTaskSpout error: " + e.getMessage(), e);
                    if (stormTaskTuple == null) continue;
                    MCSReaderSpout.this.taskStatusUpdater.setTaskDropped(stormTaskTuple.getTaskId(), "The task was dropped because " + e.getMessage());
                    continue;
                }
                break;
            }
        }

        private DpsTask getCurrentDpsTask() {
            return this.currentDpsTask;
        }

        private void startProgressing(DpsTask dpsTask) {
            LOGGER.info("Start progressing for Task with id {}", (Object)dpsTask.getTaskId());
            MCSReaderSpout.this.taskStatusUpdater.updateTask(dpsTask.getTaskId(), "", String.valueOf((Object)TaskState.CURRENTLY_PROCESSING), new Date());
        }

        private String getStream(DpsTask task) {
            if (task.getInputData().get((Object)InputDataType.FILE_URLS) != null) {
                return InputDataType.FILE_URLS.name();
            }
            return InputDataType.DATASET_URLS.name();
        }

        public ArrayBlockingQueue<DpsTask> getTaskQueue() {
            return this.taskQueue;
        }
    }
}

