/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.eaio.uuid.UUID;
import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.exception.DatabaseConnectionException;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.utils.CassandraSubTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBolt
extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationBolt.class);
    protected Map stormConfig;
    protected TopologyContext topologyContext;
    protected OutputCollector outputCollector;
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    protected LRUCache<Long, NotificationCache> cache = new LRUCache(50);
    protected String topologyName;
    private CassandraConnectionProvider cassandraConnectionProvider;
    private CassandraTaskInfoDAO taskInfoDAO;
    protected TaskStatusUpdater taskStatusUpdater;
    private CassandraSubTaskInfoDAO subTaskInfoDAO;
    protected ProcessedRecordsDAO processedRecordsDAO;
    private CassandraTaskErrorsDAO taskErrorDAO;
    private static final int COUNTER_UPDATE_INTERVAL_IN_MS = 5000;
    protected static final int DEFAULT_RETRIES = 3;
    protected static final int SLEEP_TIME = 5000;

    public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) {
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    public void execute(Tuple tuple) {
        try {
            NotificationTuple notificationTuple = NotificationTuple.fromStormTuple(tuple);
            NotificationCache nCache = this.cache.get(notificationTuple.getTaskId());
            if (nCache == null) {
                nCache = new NotificationCache(notificationTuple.getTaskId());
                this.cache.put(notificationTuple.getTaskId(), nCache);
            }
            this.storeTaskDetails(notificationTuple, nCache);
        }
        catch (NoHostAvailableException | QueryExecutionException ex) {
            LOGGER.error("Cannot store notification to Cassandra because: {}", (Object)ex.getMessage());
            return;
        }
        catch (Exception ex) {
            LOGGER.error("Problem with store notification because: {}", (Object)ex.getMessage(), (Object)ex);
            return;
        }
    }

    private void storeTaskDetails(NotificationTuple notificationTuple, NotificationCache nCache) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
        long taskId = notificationTuple.getTaskId();
        switch (notificationTuple.getInformationType()) {
            case UPDATE_TASK: {
                this.updateTask(taskId, notificationTuple.getParameters());
                break;
            }
            case NOTIFICATION: {
                this.notifyTask(notificationTuple, nCache, taskId);
                this.storeFinishState(notificationTuple);
            }
        }
    }

    private void notifyTask(NotificationTuple notificationTuple, NotificationCache nCache, long taskId) throws DatabaseConnectionException, TaskInfoDoesNotExistException {
        boolean error = this.isError(notificationTuple, nCache);
        int processesFilesCount = nCache.getProcessed();
        int errors = nCache.getErrors();
        this.storeNotification(processesFilesCount, taskId, notificationTuple.getParameters());
        if (error) {
            this.storeNotificationError(taskId, nCache, notificationTuple);
        }
        if (this.isCounterUpdateRequired(nCache)) {
            LOGGER.info("Updating task counter for task_id = {} and counter value: {}", (Object)taskId, (Object)processesFilesCount);
            this.taskStatusUpdater.setUpdateProcessedFiles(taskId, processesFilesCount, errors);
            nCache.setLastCounterUpdate(new Date());
        }
    }

    private boolean isCounterUpdateRequired(NotificationCache nCache) {
        return new Date().getTime() - nCache.getLastCounterUpdate().getTime() > 5000L;
    }

    private void storeNotificationError(long taskId, NotificationCache nCache, NotificationTuple notificationTuple) {
        Map<String, Object> parameters = notificationTuple.getParameters();
        Validate.notNull(parameters);
        String errorMessage = String.valueOf(parameters.get("info_text"));
        String additionalInformation = String.valueOf(parameters.get("additionalInfo"));
        if (!String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(RecordState.ERROR.toString()) && parameters.get("UNIFIED_ERROR_MESSAGE") != null) {
            errorMessage = String.valueOf(parameters.get("UNIFIED_ERROR_MESSAGE"));
            additionalInformation = String.valueOf(parameters.get("EXCEPTION_ERROR_MESSAGE"));
        }
        String errorType = nCache.getErrorType(errorMessage);
        String resource = String.valueOf(parameters.get("resource"));
        this.updateErrorCounter(taskId, errorType);
        this.insertError(taskId, errorMessage, additionalInformation, errorType, resource);
    }

    private void updateErrorCounter(long taskId, String errorType) {
        int retries = 3;
        while (true) {
            try {
                this.taskErrorDAO.updateErrorCounter(taskId, errorType);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while updating Error counter. Retries left: {}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while updating Error counter.");
                throw e;
            }
            break;
        }
    }

    private void insertError(long taskId, String errorMessage, String additionalInformation, String errorType, String resource) {
        int retries = 3;
        while (true) {
            try {
                this.taskErrorDAO.insertError(taskId, errorType, errorMessage, resource, additionalInformation);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while inserting Error to cassandra. Retries left: {}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while inserting Error to cassandra.");
                throw e;
            }
            break;
        }
    }

    protected void waitForSpecificTime() {
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOGGER.error(e1.getMessage());
        }
    }

    private boolean isError(NotificationTuple notificationTuple, NotificationCache nCache) {
        if (String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(RecordState.ERROR.toString())) {
            nCache.inc(true);
            return true;
        }
        if (notificationTuple.getParameter("UNIFIED_ERROR_MESSAGE") != null) {
            nCache.inc(false);
            return true;
        }
        nCache.inc(false);
        return false;
    }

    public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) {
        this.cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(this.cassandraConnectionProvider);
        this.taskStatusUpdater = TaskStatusUpdater.getInstance(this.cassandraConnectionProvider);
        this.subTaskInfoDAO = CassandraSubTaskInfoDAO.getInstance(this.cassandraConnectionProvider);
        this.processedRecordsDAO = ProcessedRecordsDAO.getInstance(this.cassandraConnectionProvider);
        this.taskErrorDAO = CassandraTaskErrorsDAO.getInstance(this.cassandraConnectionProvider);
        this.topologyName = (String)stormConf.get("topology.name");
        this.stormConfig = stormConf;
        this.topologyContext = tc;
        this.outputCollector = oc;
    }

    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    private void updateTask(long taskId, Map<String, Object> parameters) throws DatabaseConnectionException {
        Validate.notNull(parameters);
        String state = String.valueOf(parameters.get("state"));
        String info = String.valueOf(parameters.get("info"));
        Date startDate = NotificationBolt.prepareDate(parameters.get("start_time"));
        this.updateTask(taskId, state, info, startDate);
    }

    private void updateTask(long taskId, String state, String info, Date startDate) {
        int retries = 3;
        while (true) {
            try {
                this.taskStatusUpdater.updateTask(taskId, info, state, startDate);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while Updating the task info. Retries left: {}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while Updating the task info.");
                throw e;
            }
            break;
        }
    }

    private static Date prepareDate(Object dateObject) {
        Date date = null;
        if (dateObject instanceof Date) {
            return (Date)dateObject;
        }
        return date;
    }

    private void storeFinishState(NotificationTuple notificationTuple) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
        int expectedSize;
        NotificationCache nCache;
        int count;
        long taskId = notificationTuple.getTaskId();
        TaskInfo task = this.taskInfoDAO.searchById(taskId);
        if (task != null && (count = (nCache = this.cache.get(taskId)).getProcessed()) == (expectedSize = task.getExpectedSize())) {
            this.endTask(notificationTuple, nCache.getErrors(), count);
        }
    }

    protected void endTask(NotificationTuple notificationTuple, int errors, int count) {
        this.taskStatusUpdater.endTask(notificationTuple.getTaskId(), count, errors, "Completely processed", String.valueOf((Object)TaskState.PROCESSED), new Date());
    }

    private void storeNotification(int resourceNum, long taskId, Map<String, Object> parameters) throws DatabaseConnectionException {
        Validate.notNull(parameters);
        String resource = String.valueOf(parameters.get("resource"));
        String state = String.valueOf(parameters.get("state"));
        String infoText = String.valueOf(parameters.get("info_text"));
        String additionalInfo = String.valueOf(parameters.get("additionalInfo"));
        String resultResource = String.valueOf(parameters.get("resultResource"));
        this.insertRecordDetailedInformation(resourceNum, taskId, resource, state, infoText, additionalInfo, resultResource);
    }

    protected void insertRecordDetailedInformation(int resourceNum, long taskId, String resource, String state, String infoText, String additionalInfo, String resultResource) {
        int retries = 3;
        while (true) {
            try {
                this.subTaskInfoDAO.insert(resourceNum, taskId, this.topologyName, resource, state, infoText, additionalInfo, resultResource);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while inserting detailed record information to cassandra. Retries left: {}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while inserting detailed record information to cassandra.");
                throw e;
            }
            break;
        }
    }

    public void clearCache() {
        this.cache.clear();
    }

    protected class NotificationCache {
        int processed = 0;
        int errors = 0;
        private Date lastCounterUpdate = new Date();
        Map<String, String> errorTypes = new HashMap<String, String>();

        NotificationCache(long taskId) {
            this.processed = NotificationBolt.this.subTaskInfoDAO.getProcessedFilesCount(taskId);
            if (this.processed > 0) {
                this.errors = NotificationBolt.this.taskErrorDAO.getErrorCount(taskId);
                this.errorTypes = NotificationBolt.this.taskErrorDAO.getMessagesUuids(taskId);
                LOGGER.debug("Restored state of NotificationBolt from Cassandra for taskId={} processed={} errors={}\nerrorTypes={}", taskId, this.processed, this.errors, this.errorTypes);
            }
        }

        public void inc(boolean error) {
            ++this.processed;
            if (error) {
                ++this.errors;
            }
        }

        public int getProcessed() {
            return this.processed;
        }

        public int getErrors() {
            return this.errors;
        }

        public String getErrorType(String infoText) {
            String errorType = this.errorTypes.get(infoText);
            if (errorType == null) {
                errorType = new UUID().toString();
                this.errorTypes.put(infoText, errorType);
            }
            return errorType;
        }

        Date getLastCounterUpdate() {
            return this.lastCounterUpdate;
        }

        void setLastCounterUpdate(Date lastCounterUpdate) {
            this.lastCounterUpdate = lastCounterUpdate;
        }
    }
}

