/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.core.exceptions.QueryExecutionException;
import com.eaio.uuid.UUID;
import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.exception.DatabaseConnectionException;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.utils.CassandraSubTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBolt
extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationBolt.class);
    protected Map stormConfig;
    protected TopologyContext topologyContext;
    protected OutputCollector outputCollector;
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    private static LRUCache<Long, NotificationCache> cache = new LRUCache(50);
    private String topologyName;
    private static CassandraConnectionProvider cassandraConnectionProvider;
    private static CassandraTaskInfoDAO taskInfoDAO;
    private static CassandraSubTaskInfoDAO subTaskInfoDAO;
    private static CassandraTaskErrorsDAO taskErrorDAO;
    private static final int PROCESSED_INTERVAL = 100;
    private static final int DEFAULT_RETRIES = 3;
    private static final int SLEEP_TIME = 5000;

    public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) {
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    public void execute(Tuple tuple) {
        try {
            NotificationTuple notificationTuple = NotificationTuple.fromStormTuple(tuple);
            NotificationCache nCache = cache.get(notificationTuple.getTaskId());
            if (nCache == null) {
                nCache = new NotificationCache();
                cache.put(notificationTuple.getTaskId(), nCache);
            }
            this.storeTaskDetails(notificationTuple, nCache);
        }
        catch (NoHostAvailableException | QueryExecutionException ex) {
            LOGGER.error("Cannot store notification to Cassandra because: {}", (Object)ex.getMessage());
            return;
        }
        catch (Exception ex) {
            LOGGER.error("Problem with store notification because: {}", (Object)ex.getMessage());
            return;
        }
    }

    private void storeTaskDetails(NotificationTuple notificationTuple, NotificationCache nCache) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
        long taskId = notificationTuple.getTaskId();
        switch (notificationTuple.getInformationType()) {
            case UPDATE_TASK: {
                this.updateTask(taskId, notificationTuple.getParameters());
                break;
            }
            case NOTIFICATION: {
                this.notifyTask(notificationTuple, nCache, taskId);
                this.storeFinishState(notificationTuple.getTaskId());
            }
        }
    }

    private void notifyTask(NotificationTuple notificationTuple, NotificationCache nCache, long taskId) throws DatabaseConnectionException, TaskInfoDoesNotExistException {
        boolean error = this.isError(notificationTuple, nCache);
        int processesFilesCount = nCache.getProcessed();
        int errors = nCache.getErrors();
        this.storeNotification(processesFilesCount, taskId, notificationTuple.getParameters());
        if (error) {
            this.storeNotificationError(taskId, nCache, notificationTuple);
        }
        if (processesFilesCount % 100 == 0) {
            taskInfoDAO.setUpdateProcessedFiles(taskId, processesFilesCount, errors);
        }
    }

    private void storeNotificationError(long taskId, NotificationCache nCache, NotificationTuple notificationTuple) {
        Map<String, Object> parameters = notificationTuple.getParameters();
        Validate.notNull(parameters);
        String errorMessage = String.valueOf(parameters.get("info_text"));
        String additionalInformation = String.valueOf(parameters.get("additionalInfo"));
        if (!String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(States.ERROR.toString()) && parameters.get("UNIFIED_ERROR_MESSAGE") != null) {
            errorMessage = String.valueOf(parameters.get("UNIFIED_ERROR_MESSAGE"));
            additionalInformation = String.valueOf(parameters.get("EXCEPTION_ERROR_MESSAGE"));
        }
        String errorType = nCache.getErrorType(errorMessage);
        String resource = String.valueOf(parameters.get("resource"));
        this.updateErrorCounter(taskId, errorType);
        this.insertError(taskId, errorMessage, additionalInformation, errorType, resource);
    }

    private void updateErrorCounter(long taskId, String errorType) {
        int retries = 3;
        while (true) {
            try {
                taskErrorDAO.updateErrorCounter(taskId, errorType);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while updating Error counter. Retries left: {}", (Object)retries);
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        LOGGER.error(e1.getMessage());
                    }
                    continue;
                }
                LOGGER.error("Error while updating Error counter.");
                throw e;
            }
            break;
        }
    }

    private void insertError(long taskId, String errorMessage, String additionalInformation, String errorType, String resource) {
        int retries = 3;
        while (true) {
            try {
                taskErrorDAO.insertError(taskId, errorType, errorMessage, resource, additionalInformation);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while inserting Error to cassandra. Retries left: {}", (Object)retries);
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        LOGGER.error(e1.getMessage());
                    }
                    continue;
                }
                LOGGER.error("Error while inserting Error to cassandra.");
                throw e;
            }
            break;
        }
    }

    private boolean isError(NotificationTuple notificationTuple, NotificationCache nCache) {
        if (String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(States.ERROR.toString())) {
            nCache.inc(true);
            return true;
        }
        if (notificationTuple.getParameter("UNIFIED_ERROR_MESSAGE") != null) {
            nCache.inc(false);
            return true;
        }
        nCache.inc(false);
        return false;
    }

    public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) {
        cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        subTaskInfoDAO = CassandraSubTaskInfoDAO.getInstance(cassandraConnectionProvider);
        taskErrorDAO = CassandraTaskErrorsDAO.getInstance(cassandraConnectionProvider);
        this.topologyName = (String)stormConf.get("topology.name");
        this.stormConfig = stormConf;
        this.topologyContext = tc;
        this.outputCollector = oc;
    }

    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    private void updateTask(long taskId, Map<String, Object> parameters) throws DatabaseConnectionException {
        Validate.notNull(parameters);
        String state = String.valueOf(parameters.get("state"));
        String info = String.valueOf(parameters.get("info"));
        Date startDate = NotificationBolt.prepareDate(parameters.get("start_time"));
        this.updateTask(taskId, state, info, startDate);
    }

    private void updateTask(long taskId, String state, String info, Date startDate) {
        int retries = 3;
        while (true) {
            try {
                taskInfoDAO.updateTask(taskId, info, state, startDate);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while Updating the task info. Retries left: {}", (Object)retries);
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        LOGGER.error(e1.getMessage());
                    }
                    continue;
                }
                LOGGER.error("Error while Updating the task info.");
                throw e;
            }
            break;
        }
    }

    private static Date prepareDate(Object dateObject) {
        Date date = null;
        if (dateObject instanceof Date) {
            return (Date)dateObject;
        }
        return date;
    }

    private void storeFinishState(long taskId) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
        int expectedSize;
        NotificationCache nCache;
        int count2;
        TaskInfo task = taskInfoDAO.searchById(taskId);
        if (task != null && (count2 = (nCache = cache.get(taskId)).getProcessed()) == (expectedSize = task.getExpectedSize())) {
            taskInfoDAO.endTask(taskId, count2, nCache.getErrors(), "Completely processed", String.valueOf((Object)TaskState.PROCESSED), new Date());
        }
    }

    private void storeNotification(int resourceNum, long taskId, Map<String, Object> parameters) throws DatabaseConnectionException {
        Validate.notNull(parameters);
        String resource = String.valueOf(parameters.get("resource"));
        String state = String.valueOf(parameters.get("state"));
        String infoText = String.valueOf(parameters.get("info_text"));
        String additionalInfo = String.valueOf(parameters.get("additionalInfo"));
        String resultResource = String.valueOf(parameters.get("resultResource"));
        this.insertRecordDetailedInformation(resourceNum, taskId, resource, state, infoText, additionalInfo, resultResource);
    }

    private void insertRecordDetailedInformation(int resourceNum, long taskId, String resource, String state, String infoText, String additionalInfo, String resultResource) {
        int retries = 3;
        while (true) {
            try {
                subTaskInfoDAO.insert(resourceNum, taskId, this.topologyName, resource, state, infoText, additionalInfo, resultResource);
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while inserting detailed record information to cassandra. Retries left: {}", (Object)retries);
                    try {
                        Thread.sleep(5000L);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                        LOGGER.error(e1.getMessage());
                    }
                    continue;
                }
                LOGGER.error("Error while inserting detailed record information to cassandra.");
                throw e;
            }
            break;
        }
    }

    public static void clearCache() {
        cache.clear();
    }

    private static class NotificationCache {
        int processed = 0;
        int errors = 0;
        Map<String, String> errorTypes = new HashMap<String, String>();

        NotificationCache() {
        }

        public void inc(boolean error) {
            ++this.processed;
            if (error) {
                ++this.errors;
            }
        }

        public int getProcessed() {
            return this.processed;
        }

        public int getErrors() {
            return this.errors;
        }

        public String getErrorType(String infoText) {
            String errorType = this.errorTypes.get(infoText);
            if (errorType == null) {
                errorType = new UUID().toString();
                this.errorTypes.put(infoText, errorType);
            }
            return errorType;
        }
    }
}

