/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.ProcessedRecord;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraSubTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.io.IOException;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.Validate;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.StandardToStringStyle;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBolt
extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationBolt.class);
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    protected transient OutputCollector outputCollector;
    protected LRUCache<Long, NotificationCache> cache = new LRUCache(50);
    protected String topologyName;
    protected transient TaskStatusUpdater taskStatusUpdater;
    protected transient ProcessedRecordsDAO processedRecordsDAO;
    protected transient CassandraTaskInfoDAO taskInfoDAO;
    private transient TaskDiagnosticInfoDAO taskDiagnosticInfoDAO;
    private transient CassandraSubTaskInfoDAO subTaskInfoDAO;
    private transient CassandraTaskErrorsDAO taskErrorDAO;

    public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) {
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        try {
            NotificationTuple notificationTuple = NotificationTuple.fromStormTuple(tuple);
            NotificationCache nCache = this.cache.get(notificationTuple.getTaskId());
            if (nCache == null) {
                nCache = new NotificationCache(notificationTuple.getTaskId());
                this.cache.put(notificationTuple.getTaskId(), nCache);
            }
            this.storeNotificationInfo(notificationTuple, nCache);
        }
        catch (NoHostAvailableException | QueryExecutionException ex) {
            LOGGER.error("Cannot store notification to Cassandra because: {}", (Object)ex.getMessage());
        }
        catch (Exception ex) {
            LOGGER.error("Problem with store notification because: {}", (Object)ex.getMessage(), (Object)ex);
        }
        finally {
            this.outputCollector.ack(tuple);
        }
    }

    public void prepare(Map stormConf, TopologyContext tc, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        this.taskDiagnosticInfoDAO = TaskDiagnosticInfoDAO.getInstance(cassandraConnectionProvider);
        this.taskStatusUpdater = TaskStatusUpdater.getInstance(cassandraConnectionProvider);
        this.subTaskInfoDAO = CassandraSubTaskInfoDAO.getInstance(cassandraConnectionProvider);
        this.processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
        this.taskErrorDAO = CassandraTaskErrorsDAO.getInstance(cassandraConnectionProvider);
        this.topologyName = (String)stormConf.get("topology.name");
    }

    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    private void storeNotificationInfo(NotificationTuple notificationTuple, NotificationCache nCache) throws TaskInfoDoesNotExistException {
        String recordId;
        long taskId = notificationTuple.getTaskId();
        Optional<ProcessedRecord> theRecord = this.processedRecordsDAO.selectByPrimaryKey(taskId, recordId = String.valueOf(notificationTuple.getParameters().get("resource")));
        if (theRecord.isEmpty() || !this.isFinished(theRecord.get())) {
            this.notifyTask(notificationTuple, nCache, taskId);
            RecordState newRecordState = this.isErrorTuple(notificationTuple) ? RecordState.ERROR : RecordState.SUCCESS;
            this.processedRecordsDAO.updateProcessedRecordState(taskId, recordId, newRecordState);
            this.storeFinishState(notificationTuple);
        }
        this.taskDiagnosticInfoDAO.updateLastRecordFinishedOnStormTime(notificationTuple.getTaskId(), Instant.now());
    }

    private void notifyTask(NotificationTuple notificationTuple, NotificationCache nCache, long taskId) {
        boolean error = this.isError(notificationTuple);
        nCache.incrementCounters(notificationTuple);
        this.storeNotification(nCache.getProcessed(), taskId, notificationTuple.getParameters());
        if (error) {
            this.storeNotificationError(taskId, nCache, notificationTuple);
        }
        this.saveProgressCounters(taskId, nCache);
    }

    private void saveProgressCounters(long taskId, NotificationCache nCache) {
        LOGGER.info("Updating task counter for task_id = {} and counters: {}", (Object)taskId, (Object)nCache.getCountersAsText());
        this.taskStatusUpdater.setUpdateProcessedFiles(taskId, nCache.getProcessedRecordsCount(), nCache.getIgnoredRecordsCount(), nCache.getDeletedRecordsCount(), nCache.getProcessedErrorsCount(), nCache.getDeletedErrorsCount());
    }

    private void storeNotificationError(long taskId, NotificationCache nCache, NotificationTuple notificationTuple) {
        Map<String, Object> parameters = notificationTuple.getParameters();
        Validate.notNull(parameters);
        String errorMessage = String.valueOf(parameters.get("info_text"));
        String additionalInformation = String.valueOf(parameters.get("additionalInfo"));
        if (!this.isErrorTuple(notificationTuple) && parameters.get("UNIFIED_ERROR_MESSAGE") != null) {
            errorMessage = String.valueOf(parameters.get("UNIFIED_ERROR_MESSAGE"));
            additionalInformation = String.valueOf(parameters.get("EXCEPTION_ERROR_MESSAGE"));
        }
        String errorType = nCache.getErrorType(errorMessage);
        String resource = String.valueOf(parameters.get("resource"));
        this.updateErrorCounter(taskId, errorType);
        this.insertError(taskId, errorMessage, additionalInformation, errorType, resource);
    }

    private void updateErrorCounter(long taskId, String errorType) {
        this.taskErrorDAO.updateErrorCounter(taskId, errorType);
    }

    private void insertError(long taskId, String errorMessage, String additionalInformation, String errorType, String resource) {
        long errorCount = this.taskErrorDAO.selectErrorCountsForErrorType(taskId, UUID.fromString(errorType));
        if (!this.maximumNumberOfErrorsReached(errorCount)) {
            this.taskErrorDAO.insertError(taskId, errorType, errorMessage, resource, additionalInformation);
        } else {
            LOGGER.warn("Will not store the error message because threshold reached for taskId={}. ", (Object)taskId);
        }
    }

    private boolean maximumNumberOfErrorsReached(long errorCount) {
        return errorCount > 100L;
    }

    private boolean isError(NotificationTuple notificationTuple) {
        return this.isErrorTuple(notificationTuple) || notificationTuple.getParameter("UNIFIED_ERROR_MESSAGE") != null;
    }

    private void storeFinishState(NotificationTuple notificationTuple) throws TaskInfoDoesNotExistException {
        int expectedSize;
        NotificationCache nCache;
        int count;
        long taskId = notificationTuple.getTaskId();
        TaskInfo task = this.taskInfoDAO.findById(taskId).orElseThrow(TaskInfoDoesNotExistException::new);
        if (task != null && (count = (nCache = this.cache.get(taskId)).getProcessed()) == (expectedSize = task.getExpectedRecordsNumber())) {
            this.endTask(notificationTuple, nCache);
        }
    }

    private void storeNotification(int resourceNum, long taskId, Map<String, Object> parameters) {
        Validate.notNull(parameters);
        String resource = String.valueOf(parameters.get("resource"));
        String state = String.valueOf(parameters.get("state"));
        String infoText = String.valueOf(parameters.get("info_text"));
        Object additionalInfo = String.valueOf(parameters.get("additionalInfo"));
        String resultResource = String.valueOf(parameters.get("resultResource"));
        long now = Instant.now().toEpochMilli();
        long processingTime = now - (Long)parameters.get("START_TIME");
        additionalInfo = (String)additionalInfo + " Processing time: " + processingTime;
        this.insertRecordDetailedInformation(resourceNum, taskId, resource, state, infoText, (String)additionalInfo, resultResource);
    }

    private boolean isErrorTuple(NotificationTuple notificationTuple) {
        return String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(RecordState.ERROR.toString());
    }

    private boolean isFinished(ProcessedRecord theRecord) {
        return theRecord.getState() == RecordState.SUCCESS || theRecord.getState() == RecordState.ERROR;
    }

    protected void endTask(NotificationTuple notificationTuple, NotificationCache nCache) {
        try {
            if (this.needsPostProcessing(notificationTuple)) {
                this.setTaskStatusToReadyForPostprocessing(notificationTuple, nCache);
            } else {
                this.setTaskProcessed(notificationTuple, nCache);
            }
            this.taskDiagnosticInfoDAO.updateFinishOnStormTime(notificationTuple.getTaskId(), Instant.now());
        }
        catch (Exception e) {
            LOGGER.error("Unable to end the task. id: {} ", (Object)notificationTuple.getTaskId(), (Object)e);
            this.taskStatusUpdater.setTaskDropped(notificationTuple.getTaskId(), "Unable to end the task");
        }
    }

    protected boolean needsPostProcessing(NotificationTuple tuple) throws TaskInfoDoesNotExistException, IOException {
        return false;
    }

    private void setTaskProcessed(NotificationTuple notificationTuple, NotificationCache nCache) {
        this.taskStatusUpdater.setTaskCompletelyProcessed(notificationTuple.getTaskId(), "Completely processed");
        LOGGER.info("Task id={} completely processed! Counters: {} ", (Object)notificationTuple.getTaskId(), (Object)nCache.getCountersAsText());
    }

    private void setTaskStatusToReadyForPostprocessing(NotificationTuple notificationTuple, NotificationCache nCache) {
        this.taskStatusUpdater.updateState(notificationTuple.getTaskId(), TaskState.READY_FOR_POST_PROCESSING, "Ready for post processing after topology stage is finished");
        LOGGER.info("Task id={} finished topology stage. Now it is waiting for post processing. Counters: {}", (Object)notificationTuple.getTaskId(), (Object)nCache.getCountersAsText());
    }

    protected DpsTask loadDpsTask(NotificationTuple tuple) throws TaskInfoDoesNotExistException, IOException {
        Optional<TaskInfo> taskInfo = this.taskInfoDAO.findById(tuple.getTaskId());
        String taskDefinition = taskInfo.orElseThrow(TaskInfoDoesNotExistException::new).getDefinition();
        return new ObjectMapper().readValue(taskDefinition, DpsTask.class);
    }

    protected void insertRecordDetailedInformation(int resourceNum, long taskId, String resource, String state, String infoText, String additionalInfo, String resultResource) {
        this.subTaskInfoDAO.insert(resourceNum, taskId, this.topologyName, resource, state, infoText, additionalInfo, resultResource);
    }

    protected class NotificationCache {
        int processed;
        int processedRecordsCount;
        int ignoredRecordsCount;
        int deletedRecordsCount;
        int processedErrorsCount;
        int deletedErrorsCount;
        Map<String, String> errorTypes = new HashMap<String, String>();

        NotificationCache(long taskId) {
            this.processed = NotificationBolt.this.subTaskInfoDAO.getProcessedFilesCount(taskId);
            if (this.processed > 0) {
                TaskInfo taskInfo = NotificationBolt.this.taskInfoDAO.findById(taskId).orElseThrow();
                this.processedRecordsCount = taskInfo.getProcessedRecordsCount();
                this.ignoredRecordsCount = taskInfo.getIgnoredRecordsCount();
                this.deletedRecordsCount = taskInfo.getDeletedRecordsCount();
                this.processedErrorsCount = taskInfo.getProcessedErrorsCount();
                this.deletedErrorsCount = taskInfo.getDeletedErrorsCount();
                this.errorTypes = this.getMessagesUUIDsMap(taskId);
                LOGGER.info("Restored state of NotificationBolt from Cassandra for taskId={} counters={}", (Object)taskId, (Object)this.getCountersAsText());
            }
        }

        public void incrementCounters(NotificationTuple notificationTuple) {
            ++this.processed;
            if (notificationTuple.isMarkedAsDeleted()) {
                ++this.deletedRecordsCount;
                if (NotificationBolt.this.isErrorTuple(notificationTuple)) {
                    ++this.deletedErrorsCount;
                }
            } else if (notificationTuple.isIgnoredRecord()) {
                if (NotificationBolt.this.isErrorTuple(notificationTuple)) {
                    LOGGER.error("Tuple is marked as ignored and error in the same time! It should not occur. Tuple: {}", (Object)notificationTuple);
                    ++this.processedRecordsCount;
                    ++this.processedErrorsCount;
                } else {
                    ++this.ignoredRecordsCount;
                }
            } else {
                ++this.processedRecordsCount;
                if (NotificationBolt.this.isErrorTuple(notificationTuple)) {
                    ++this.processedErrorsCount;
                }
            }
        }

        private Map<String, String> getMessagesUUIDsMap(long taskId) {
            HashMap<String, String> errorMessageToUuidMap = new HashMap<String, String>();
            Iterator<String> it = NotificationBolt.this.taskErrorDAO.getMessagesUuids(taskId);
            while (it.hasNext()) {
                String errorType = it.next();
                Optional<String> message = NotificationBolt.this.taskErrorDAO.getErrorMessage(taskId, errorType);
                message.ifPresent(s2 -> errorMessageToUuidMap.put((String)s2, errorType));
            }
            return errorMessageToUuidMap;
        }

        public String getErrorType(String infoText) {
            return this.errorTypes.computeIfAbsent(infoText, key -> new com.eaio.uuid.UUID().toString());
        }

        public String getCountersAsText() {
            StandardToStringStyle style = new StandardToStringStyle();
            style.setUseClassName(false);
            style.setUseIdentityHashCode(false);
            style.setContentEnd("");
            style.setContentStart("");
            return new ReflectionToStringBuilder(this, style).setExcludeFieldNames("errorTypes").toString();
        }

        public int getProcessed() {
            return this.processed;
        }

        public int getProcessedRecordsCount() {
            return this.processedRecordsCount;
        }

        public int getIgnoredRecordsCount() {
            return this.ignoredRecordsCount;
        }

        public int getDeletedRecordsCount() {
            return this.deletedRecordsCount;
        }

        public int getProcessedErrorsCount() {
            return this.processedErrorsCount;
        }

        public int getDeletedErrorsCount() {
            return this.deletedErrorsCount;
        }

        public Map<String, String> getErrorTypes() {
            return this.errorTypes;
        }
    }
}

