/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spout;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.ProcessedRecord;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskDiagnosticInfo;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.service.commons.utils.DateHelper;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.spout.TasksCache;
import eu.europeana.cloud.service.dps.storm.utils.DiagnosticContextWrapper;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ECloudSpout
extends KafkaSpout<String, DpsRecord> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ECloudSpout.class);
    private static final int MAX_RETRIES = 3;
    private String topologyName;
    private String hosts;
    private int port;
    private String keyspaceName;
    private String userName;
    private String password;
    protected transient CassandraTaskInfoDAO taskInfoDAO;
    protected transient TaskDiagnosticInfoDAO taskDiagnosticInfoDAO;
    protected transient TaskStatusUpdater taskStatusUpdater;
    protected transient TaskStatusChecker taskStatusChecker;
    protected transient ProcessedRecordsDAO processedRecordsDAO;
    protected transient TasksCache tasksCache;

    public ECloudSpout(String topologyName, KafkaSpoutConfig<String, DpsRecord> kafkaSpoutConfig, String hosts, int port, String keyspaceName, String userName, String password) {
        super(kafkaSpoutConfig);
        this.topologyName = topologyName;
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void ack(Object messageId) {
        LOGGER.info("Record acknowledged {}", messageId);
        super.ack(messageId);
    }

    private void ackIgnoredMessage(Object messageId) {
        super.ack(messageId);
    }

    @Override
    public void fail(Object messageId) {
        LOGGER.error("Record failed {}", messageId);
        super.fail(messageId);
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        super.open(conf, context, new ECloudOutputCollector((ISpoutOutputCollector)collector));
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        this.taskStatusUpdater = TaskStatusUpdater.getInstance(cassandraConnectionProvider);
        TaskStatusChecker.init(cassandraConnectionProvider);
        this.taskStatusChecker = TaskStatusChecker.getTaskStatusChecker();
        this.processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
        this.taskDiagnosticInfoDAO = TaskDiagnosticInfoDAO.getInstance(cassandraConnectionProvider);
        this.tasksCache = new TasksCache(cassandraConnectionProvider);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    public class ECloudOutputCollector
    extends SpoutOutputCollector {
        public ECloudOutputCollector(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            DpsRecord message = null;
            try {
                message = this.readMessageFromTuple(tuple);
                DiagnosticContextWrapper.putValuesFrom(message);
                LOGGER.info("Reading message from Queue");
                if (ECloudSpout.this.taskStatusChecker.hasDroppedStatus(message.getTaskId())) {
                    List<Integer> list = this.omitMessageFromDroppedTask(messageId);
                    return list;
                }
                ProcessedRecord aRecord = this.prepareRecordForExecution(message);
                if (this.isFinished(aRecord)) {
                    List<Integer> list = this.omitAlreadyProcessedRecord(messageId);
                    return list;
                }
                if (this.maxTriesReached(aRecord)) {
                    List<Integer> list = this.emitMaxTriesReachedNotification(message, messageId);
                    return list;
                }
                List<Integer> list = this.emitRecordForProcessing(streamId, message, aRecord, messageId);
                return list;
            }
            catch (IOException | NullPointerException e) {
                LOGGER.error("Unable to read message", e);
                List<Integer> list = Collections.emptyList();
                return list;
            }
            catch (TaskInfoDoesNotExistException e) {
                LOGGER.error("Task definition not found in DB");
                List<Integer> list = Collections.emptyList();
                return list;
            }
            finally {
                DiagnosticContextWrapper.clear();
            }
        }

        List<Integer> omitAlreadyProcessedRecord(Object messageId) {
            LOGGER.info("Dropping kafka message because record was already processed");
            ECloudSpout.this.ackIgnoredMessage(messageId);
            return Collections.emptyList();
        }

        List<Integer> emitRecordForProcessing(String streamId, DpsRecord message, ProcessedRecord aRecord, Object compositeMessageId) throws TaskInfoDoesNotExistException, IOException {
            TaskInfo taskInfo = this.getTaskInfo(message);
            this.updateDiagnosticCounters(aRecord);
            StormTaskTuple stormTaskTuple = this.prepareTaskForEmission(taskInfo, message, aRecord);
            LOGGER.info("Emitting a record to the subsequent bolt");
            return super.emit(streamId, (List)stormTaskTuple.toStormTuple(), compositeMessageId);
        }

        List<Integer> emitMaxTriesReachedNotification(DpsRecord message, Object compositeMessageId) {
            LOGGER.info("Emitting record to the notification bolt directly because of max_retries reached");
            NotificationTuple notificationTuple = NotificationTuple.prepareNotification(message.getTaskId(), message.isMarkedAsDeleted(), message.getRecordId(), RecordState.ERROR, "Max retries reached", "Max retries reached", System.currentTimeMillis());
            return super.emit("NotificationStream", (List)notificationTuple.toStormTuple(), compositeMessageId);
        }

        List<Integer> omitMessageFromDroppedTask(Object messageId) {
            ECloudSpout.this.ackIgnoredMessage(messageId);
            LOGGER.info("Dropping kafka message because task was dropped");
            return Collections.emptyList();
        }

        private boolean maxTriesReached(ProcessedRecord aRecord) {
            return aRecord.getAttemptNumber() > 3;
        }

        private boolean isFinished(ProcessedRecord aRecord) {
            return aRecord.getState() == RecordState.SUCCESS || aRecord.getState() == RecordState.ERROR;
        }

        private DpsRecord readMessageFromTuple(List<Object> tuple) {
            return (DpsRecord)tuple.get(4);
        }

        private TaskInfo getTaskInfo(DpsRecord message) throws TaskInfoDoesNotExistException {
            return ECloudSpout.this.tasksCache.getTaskInfo(message);
        }

        private StormTaskTuple prepareTaskForEmission(TaskInfo taskInfo, DpsRecord dpsRecord, ProcessedRecord aRecord) throws IOException {
            DpsTask dpsTask = DpsTask.fromTaskInfo(taskInfo);
            StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), dpsRecord.getRecordId(), null, dpsTask.getParameters(), dpsTask.getOutputRevision(), dpsTask.getHarvestingDetails());
            stormTaskTuple.addParameter("CLOUD_LOCAL_IDENTIFIER", dpsRecord.getRecordId());
            stormTaskTuple.addParameter("SCHEMA_NAME", dpsRecord.getMetadataPrefix());
            stormTaskTuple.addParameter("SENT_DATE", DateHelper.format(taskInfo.getSentTimestamp()));
            stormTaskTuple.addParameter("START_TIME", "" + new Date().getTime());
            List<String> repositoryUrlList = dpsTask.getDataEntry(InputDataType.REPOSITORY_URLS);
            if (!CollectionUtils.isEmpty(repositoryUrlList)) {
                stormTaskTuple.addParameter("DPS_TASK_INPUT_DATA", repositoryUrlList.get(0));
            }
            stormTaskTuple.setRecordAttemptNumber(aRecord.getAttemptNumber());
            stormTaskTuple.setMarkedAsDeleted(dpsRecord.isMarkedAsDeleted());
            return stormTaskTuple;
        }

        private void updateDiagnosticCounters(ProcessedRecord aRecord) {
            TaskDiagnosticInfo taskInfo = ECloudSpout.this.tasksCache.getDiagnosticInfo(aRecord.getTaskId());
            if (taskInfo.getStartedRecordsCount() == 0) {
                taskInfo.setStartOnStormTime(Instant.now());
                ECloudSpout.this.taskDiagnosticInfoDAO.updateStartOnStormTime(taskInfo.getTaskId(), taskInfo.getStartOnStormTime());
                ECloudSpout.this.taskDiagnosticInfoDAO.updateRecordsRetryCount(taskInfo.getTaskId(), 0);
            }
            if (aRecord.getAttemptNumber() > 1) {
                LOGGER.info("Record is repeated - {} attempt!", (Object)aRecord.getAttemptNumber());
                int retryCount = taskInfo.getRecordsRetryCount();
                taskInfo.setRecordsRetryCount(++retryCount);
                ECloudSpout.this.taskDiagnosticInfoDAO.updateRecordsRetryCount(taskInfo.getTaskId(), retryCount);
            } else {
                taskInfo.setStartedRecordsCount(taskInfo.getStartedRecordsCount() + 1);
                ECloudSpout.this.taskDiagnosticInfoDAO.updateStartedRecordsCount(taskInfo.getTaskId(), taskInfo.getStartedRecordsCount());
            }
        }

        private ProcessedRecord prepareRecordForExecution(DpsRecord message) {
            ProcessedRecord aRecord;
            Optional<ProcessedRecord> recordInDb = ECloudSpout.this.processedRecordsDAO.selectByPrimaryKey(message.getTaskId(), message.getRecordId());
            if (recordInDb.isPresent()) {
                aRecord = recordInDb.get();
                aRecord.setAttemptNumber(aRecord.getAttemptNumber() + 1);
                ECloudSpout.this.processedRecordsDAO.updateAttempNumber(aRecord.getTaskId(), aRecord.getRecordId(), aRecord.getAttemptNumber());
            } else {
                aRecord = ProcessedRecord.builder().taskId(message.getTaskId()).recordId(message.getRecordId()).attemptNumber(1).state(RecordState.QUEUED).topologyName(ECloudSpout.this.topologyName).build();
                ECloudSpout.this.processedRecordsDAO.insert(aRecord);
            }
            return aRecord;
        }
    }
}

