/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spout;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ECloudSpout
extends KafkaSpout {
    private static final Logger LOGGER = LoggerFactory.getLogger(ECloudSpout.class);
    private String hosts;
    private int port;
    private String keyspaceName;
    private String userName;
    private String password;
    protected CassandraTaskInfoDAO taskInfoDAO;
    protected TaskStatusUpdater taskStatusUpdater;
    protected TaskStatusChecker taskStatusChecker;
    protected ProcessedRecordsDAO processedRecordsDAO;

    public ECloudSpout(KafkaSpoutConfig kafkaSpoutConfig, String hosts, int port, String keyspaceName, String userName, String password) {
        super(kafkaSpoutConfig);
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        super.open(conf, context, new ECloudOutputCollector((ISpoutOutputCollector)collector));
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        this.taskStatusUpdater = TaskStatusUpdater.getInstance(cassandraConnectionProvider);
        TaskStatusChecker.init(cassandraConnectionProvider);
        this.taskStatusChecker = TaskStatusChecker.getTaskStatusChecker();
        this.processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    private TaskInfo findTaskInDb(long taskId) throws TaskInfoDoesNotExistException {
        return this.taskInfoDAO.searchById(taskId);
    }

    public class ECloudOutputCollector
    extends SpoutOutputCollector {
        private LRUCache<Long, TaskInfo> cache;

        public ECloudOutputCollector(ISpoutOutputCollector delegate) {
            super(delegate);
            this.cache = new LRUCache(50);
        }

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            DpsRecord message = null;
            try {
                message = this.parseMessage(tuple.get(4).toString());
                if (ECloudSpout.this.taskStatusChecker.hasKillFlag(message.getTaskId())) {
                    LOGGER.info("Dropping kafka message because task was dropped: {}", (Object)message.getTaskId());
                    return Collections.emptyList();
                }
                TaskInfo taskInfo = this.prepareTaskInfo(message);
                StormTaskTuple stormTaskTuple = this.prepareTaskForEmission(taskInfo, message);
                LOGGER.info("Emitting record to the subsequent bolt: {}", (Object)message.toString());
                return super.emit(streamId, (List)stormTaskTuple.toStormTuple(), messageId);
            }
            catch (IOException e) {
                LOGGER.error("Unable to read message", e);
                return Collections.emptyList();
            }
            catch (TaskInfoDoesNotExistException e) {
                LOGGER.error("Task definition not found in DB for: {}", (Object)message.toString());
                return Collections.emptyList();
            }
        }

        private DpsRecord parseMessage(String rawMessage) throws IOException {
            return new ObjectMapper().readValue(rawMessage, DpsRecord.class);
        }

        private TaskInfo prepareTaskInfo(DpsRecord message) throws TaskInfoDoesNotExistException {
            TaskInfo taskInfo = this.findTaskInCache(message);
            if (this.taskFoundInCache(taskInfo)) {
                LOGGER.debug("TaskInfo found in cache");
            } else {
                LOGGER.debug("TaskInfo NOT found in cache");
                taskInfo = this.readTaskFromDB(message.getTaskId());
                this.cache.put(message.getTaskId(), taskInfo);
            }
            return taskInfo;
        }

        private TaskInfo findTaskInCache(DpsRecord kafkaMessage) {
            return this.cache.get(kafkaMessage.getTaskId());
        }

        private boolean taskFoundInCache(TaskInfo taskInfo) {
            return taskInfo != null;
        }

        private TaskInfo readTaskFromDB(long taskId) throws TaskInfoDoesNotExistException {
            return ECloudSpout.this.findTaskInDb(taskId);
        }

        private StormTaskTuple prepareTaskForEmission(TaskInfo taskInfo, DpsRecord dpsRecord) throws IOException {
            DpsTask dpsTask = new ObjectMapper().readValue(taskInfo.getTaskDefinition(), DpsTask.class);
            StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), dpsRecord.getRecordId(), null, dpsTask.getParameters(), dpsTask.getOutputRevision(), dpsTask.getHarvestingDetails());
            stormTaskTuple.addParameter("CLOUD_LOCAL_IDENTIFIER", dpsRecord.getRecordId());
            stormTaskTuple.addParameter("SCHEMA_NAME", dpsRecord.getMetadataPrefix());
            List<String> repositoryUrlList = dpsTask.getDataEntry(InputDataType.REPOSITORY_URLS);
            if (!CollectionUtils.isEmpty(repositoryUrlList)) {
                stormTaskTuple.addParameter("DPS_TASK_INPUT_DATA", repositoryUrlList.get(0));
            }
            return stormTaskTuple;
        }
    }
}

