/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spout;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.ProcessedRecord;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskDiagnosticInfo;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.common.properties.CassandraProperties;
import eu.europeana.cloud.service.commons.utils.DateHelper;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.spout.ECloudSpoutMXBean;
import eu.europeana.cloud.service.dps.storm.spout.TasksCache;
import eu.europeana.cloud.service.dps.storm.utils.DiagnosticContextWrapper;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import org.apache.commons.collections.CollectionUtils;
import org.apache.storm.kafka.spout.KafkaSpout;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ECloudSpout
extends KafkaSpout<String, DpsRecord> {
    private static final Logger LOGGER = LoggerFactory.getLogger(ECloudSpout.class);
    private static final int MAX_RETRIES = 3;
    private final String topologyName;
    private final String topic;
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    protected transient CassandraTaskInfoDAO taskInfoDAO;
    protected transient TaskDiagnosticInfoDAO taskDiagnosticInfoDAO;
    protected transient TaskStatusUpdater taskStatusUpdater;
    protected transient TaskStatusChecker taskStatusChecker;
    protected transient ProcessedRecordsDAO processedRecordsDAO;
    protected transient TasksCache tasksCache;
    protected transient ECloudSpoutSamplerMXBean eCloudSpoutSamplerMXBean;
    private transient ECloudOutputCollector eCloudOutputCollector;
    protected long maxTaskPending = Long.MAX_VALUE;

    public ECloudSpout(String topologyName, String topic, KafkaSpoutConfig<String, DpsRecord> kafkaSpoutConfig, CassandraProperties cassandraProperties) {
        super(kafkaSpoutConfig);
        this.topologyName = topologyName;
        this.topic = topic;
        this.hosts = cassandraProperties.getHosts();
        this.port = cassandraProperties.getPort();
        this.keyspaceName = cassandraProperties.getKeyspace();
        this.userName = cassandraProperties.getUser();
        this.password = cassandraProperties.getPassword();
    }

    public void ack(Object messageId) {
        this.eCloudSpoutSamplerMXBean.lastAckedMessageId = String.valueOf(messageId);
        LOGGER.info("Record acknowledged {}", messageId);
        super.ack(messageId);
    }

    public void fail(Object messageId) {
        this.eCloudSpoutSamplerMXBean.lastFailedMessageId = String.valueOf(messageId);
        LOGGER.error("Record failed {}", messageId);
        super.fail(messageId);
    }

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.eCloudSpoutSamplerMXBean = new ECloudSpoutSamplerMXBean();
        this.eCloudOutputCollector = new ECloudOutputCollector((ISpoutOutputCollector)collector);
        super.open(conf, context, (SpoutOutputCollector)this.eCloudOutputCollector);
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider((String)this.hosts, (int)this.port, (String)this.keyspaceName, (String)this.userName, (String)this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        this.taskStatusUpdater = TaskStatusUpdater.getInstance(cassandraConnectionProvider);
        this.taskStatusChecker = TaskStatusChecker.getTaskStatusChecker(cassandraConnectionProvider);
        this.processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
        this.taskDiagnosticInfoDAO = TaskDiagnosticInfoDAO.getInstance(cassandraConnectionProvider);
        this.tasksCache = new TasksCache(cassandraConnectionProvider);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    private void ackIgnoredMessage(Object messageId) {
        super.ack(messageId);
    }

    private StormTaskTuple getStormTaskTupleFromMessage(DpsRecord message) {
        StormTaskTuple stormTaskTuple = new StormTaskTuple();
        stormTaskTuple.setTaskId(message.getTaskId());
        stormTaskTuple.setMarkedAsDeleted(message.isMarkedAsDeleted());
        stormTaskTuple.setFileUrl(message.getRecordId());
        stormTaskTuple.addParameter("START_TIME", String.valueOf(System.currentTimeMillis()));
        return stormTaskTuple;
    }

    protected void performThrottling(StormTaskTuple tuple) {
        this.maxTaskPending = tuple.readParallelizationParam();
    }

    public void nextTuple() {
        if (this.eCloudOutputCollector.getPendingCount() < this.maxTaskPending) {
            super.nextTuple();
        }
    }

    private class ECloudSpoutSamplerMXBean
    implements ECloudSpoutMXBean {
        protected transient String lastConsumedMessageId;
        protected transient String lastConsumedMessage;
        protected transient String lastAckedMessageId;
        protected transient String lastFailedMessageId;
        protected transient boolean lastConsumedMessageCanceled;

        public ECloudSpoutSamplerMXBean() {
            this.register();
        }

        public void register() {
            try {
                ManagementFactory.getPlatformMBeanServer().registerMBean(this, new ObjectName(this.getName()));
            }
            catch (InstanceAlreadyExistsException | MBeanRegistrationException | MalformedObjectNameException | NotCompliantMBeanException e) {
                throw new RuntimeException(e);
            }
        }

        public String getName() {
            return "eu.europeana.cloud:executor=spouts,topic=" + ECloudSpout.this.topic;
        }

        @Override
        public String getLastConsumedMessageId() {
            return this.lastConsumedMessageId;
        }

        @Override
        public String getLastConsumedMessage() {
            return this.lastConsumedMessage;
        }

        @Override
        public String getLastAckedMessageId() {
            return this.lastAckedMessageId;
        }

        @Override
        public String getLastFailedMessageId() {
            return this.lastFailedMessageId;
        }

        @Override
        public long getMaxSpoutPending() {
            return ECloudSpout.this.maxTaskPending;
        }

        @Override
        public String showSpoutToString() {
            return this.getSpoutString();
        }

        @Override
        public String showOffsetManagers() {
            String s = this.getSpoutString();
            int index = s.lastIndexOf("emitted=");
            if (index > -1) {
                return s.substring(0, index).replaceAll("ackedMsgs=", "\nackedMsgs=\n").replaceAll("},", "},\n");
            }
            return "Could not find info in spout string!";
        }

        @Override
        public String showEmitted() {
            String s = this.getSpoutString();
            int index = s.lastIndexOf("emitted=");
            if (index > -1) {
                return s.substring(index).replaceAll("},", "},\n");
            }
            return "Could not find info in spout string!";
        }

        private String getSpoutString() {
            return ECloudSpout.this.toString();
        }
    }

    public class ECloudOutputCollector
    extends SpoutOutputCollector {
        public ECloudOutputCollector(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            DpsRecord message = null;
            try {
                message = this.readMessageFromTuple(tuple);
                ECloudSpout.this.eCloudSpoutSamplerMXBean.lastConsumedMessageId = String.valueOf(messageId);
                ECloudSpout.this.eCloudSpoutSamplerMXBean.lastConsumedMessage = String.valueOf(message);
                DiagnosticContextWrapper.putValuesFrom(message);
                LOGGER.info("Reading message from Queue");
                if (ECloudSpout.this.taskStatusChecker.hasDroppedStatus(message.getTaskId())) {
                    ECloudSpout.this.eCloudSpoutSamplerMXBean.lastConsumedMessageCanceled = true;
                    List<Integer> list = this.omitMessageFromDroppedTask(messageId);
                    return list;
                }
                ECloudSpout.this.eCloudSpoutSamplerMXBean.lastConsumedMessageCanceled = false;
                ProcessedRecord aRecord = this.prepareRecordForExecution(message);
                if (this.isFinished(aRecord)) {
                    List<Integer> list = this.omitAlreadyProcessedRecord(messageId);
                    return list;
                }
                if (this.maxTriesReached(aRecord)) {
                    List<Integer> list = this.emitMaxTriesReachedNotification(message, messageId);
                    return list;
                }
                List<Integer> list = this.emitRecordForProcessing(streamId, message, aRecord, messageId);
                return list;
            }
            catch (IOException | NullPointerException e) {
                LOGGER.error("Unable to read message", (Throwable)e);
                List<Integer> list = Collections.emptyList();
                return list;
            }
            catch (TaskInfoDoesNotExistException e) {
                LOGGER.error("Task definition not found in DB");
                List<Integer> list = Collections.emptyList();
                return list;
            }
            finally {
                DiagnosticContextWrapper.clear();
            }
        }

        private boolean maxTriesReached(ProcessedRecord aRecord) {
            return aRecord.getAttemptNumber() > 3;
        }

        private boolean isFinished(ProcessedRecord aRecord) {
            return aRecord.getState() == RecordState.SUCCESS || aRecord.getState() == RecordState.ERROR;
        }

        private DpsRecord readMessageFromTuple(List<Object> tuple) {
            return (DpsRecord)tuple.get(4);
        }

        private TaskInfo getTaskInfo(DpsRecord message) throws TaskInfoDoesNotExistException {
            return ECloudSpout.this.tasksCache.getTaskInfo(message);
        }

        private StormTaskTuple prepareTaskForEmission(TaskInfo taskInfo, DpsTask dpsTask, DpsRecord dpsRecord, ProcessedRecord aRecord) {
            StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), dpsRecord.getRecordId(), null, dpsTask.getParameters(), dpsTask.getOutputRevision(), dpsTask.getHarvestingDetails());
            stormTaskTuple.addParameter("CLOUD_LOCAL_IDENTIFIER", dpsRecord.getRecordId());
            stormTaskTuple.addParameter("SCHEMA_NAME", dpsRecord.getMetadataPrefix());
            stormTaskTuple.addParameter("SENT_DATE", DateHelper.format((Date)taskInfo.getSentTimestamp()));
            stormTaskTuple.addParameter("START_TIME", "" + new Date().getTime());
            List repositoryUrlList = dpsTask.getDataEntry(InputDataType.REPOSITORY_URLS);
            if (!CollectionUtils.isEmpty((Collection)repositoryUrlList)) {
                stormTaskTuple.addParameter("DPS_TASK_INPUT_DATA", (String)repositoryUrlList.get(0));
            }
            stormTaskTuple.setRecordAttemptNumber(aRecord.getAttemptNumber());
            stormTaskTuple.setMarkedAsDeleted(dpsRecord.isMarkedAsDeleted());
            return stormTaskTuple;
        }

        private void updateDiagnosticCounters(ProcessedRecord aRecord) {
            TaskDiagnosticInfo taskInfo = ECloudSpout.this.tasksCache.getDiagnosticInfo(aRecord.getTaskId());
            if (taskInfo.getStartedRecordsCount() == 0) {
                taskInfo.setStartOnStormTime(Instant.now());
                ECloudSpout.this.taskDiagnosticInfoDAO.updateStartOnStormTime(taskInfo.getTaskId(), taskInfo.getStartOnStormTime());
                ECloudSpout.this.taskDiagnosticInfoDAO.updateRecordsRetryCount(taskInfo.getTaskId(), 0);
            }
            if (aRecord.getAttemptNumber() > 1) {
                LOGGER.info("Record is repeated - {} attempt!", (Object)aRecord.getAttemptNumber());
                int retryCount = taskInfo.getRecordsRetryCount();
                taskInfo.setRecordsRetryCount(++retryCount);
                ECloudSpout.this.taskDiagnosticInfoDAO.updateRecordsRetryCount(taskInfo.getTaskId(), retryCount);
            } else {
                taskInfo.setStartedRecordsCount(taskInfo.getStartedRecordsCount() + 1);
                ECloudSpout.this.taskDiagnosticInfoDAO.updateStartedRecordsCount(taskInfo.getTaskId(), taskInfo.getStartedRecordsCount());
            }
        }

        private ProcessedRecord prepareRecordForExecution(DpsRecord message) {
            ProcessedRecord aRecord;
            Optional<ProcessedRecord> recordInDb = ECloudSpout.this.processedRecordsDAO.selectByPrimaryKey(message.getTaskId(), message.getRecordId());
            if (recordInDb.isPresent()) {
                aRecord = recordInDb.get();
                aRecord.setAttemptNumber(aRecord.getAttemptNumber() + 1);
                ECloudSpout.this.processedRecordsDAO.updateAttempNumber(aRecord.getTaskId(), aRecord.getRecordId(), aRecord.getAttemptNumber());
            } else {
                aRecord = ProcessedRecord.builder().taskId(message.getTaskId()).recordId(message.getRecordId()).attemptNumber(1).state(RecordState.QUEUED).topologyName(ECloudSpout.this.topologyName).build();
                ECloudSpout.this.processedRecordsDAO.insert(aRecord);
            }
            return aRecord;
        }

        List<Integer> omitAlreadyProcessedRecord(Object messageId) {
            LOGGER.info("Dropping kafka message because record was already processed");
            ECloudSpout.this.ackIgnoredMessage(messageId);
            return Collections.emptyList();
        }

        List<Integer> emitRecordForProcessing(String streamId, DpsRecord message, ProcessedRecord aRecord, Object compositeMessageId) throws TaskInfoDoesNotExistException, IOException {
            TaskInfo taskInfo = this.getTaskInfo(message);
            DpsTask dpsTask = DpsTask.fromTaskInfo((TaskInfo)taskInfo);
            this.updateDiagnosticCounters(aRecord);
            StormTaskTuple stormTaskTuple = this.prepareTaskForEmission(taskInfo, dpsTask, message, aRecord);
            ECloudSpout.this.performThrottling(stormTaskTuple);
            LOGGER.info("Emitting a record to the subsequent bolt maxPending: {}", (Object)ECloudSpout.this.maxTaskPending);
            return super.emit(streamId, (List)stormTaskTuple.toStormTuple(), compositeMessageId);
        }

        List<Integer> emitMaxTriesReachedNotification(DpsRecord message, Object compositeMessageId) {
            LOGGER.info("Emitting record to the notification bolt directly because of max_retries reached");
            StormTaskTuple stormTaskTuple = ECloudSpout.this.getStormTaskTupleFromMessage(message);
            NotificationTuple notificationTuple = NotificationTuple.prepareNotification(stormTaskTuple, RecordState.ERROR, "Max retries reached", "Max retries reached");
            return super.emit("NotificationStream", (List)notificationTuple.toStormTuple(), compositeMessageId);
        }

        List<Integer> omitMessageFromDroppedTask(Object messageId) {
            ECloudSpout.this.ackIgnoredMessage(messageId);
            LOGGER.info("Dropping kafka message because task was dropped");
            return Collections.emptyList();
        }
    }
}

