/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.commons.utils.BatchExecutor;
import eu.europeana.cloud.service.commons.utils.RetryInterruptedException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.NotificationsDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.TasksByStateDAO;
import eu.europeana.cloud.service.dps.storm.notification.NotificationCacheEntry;
import eu.europeana.cloud.service.dps.storm.notification.NotificationEntryCacheBuilder;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfig;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfigBuilder;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationTupleHandler;
import eu.europeana.cloud.service.dps.storm.utils.DiagnosticContextWrapper;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBolt
extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationBolt.class);
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    protected transient OutputCollector outputCollector;
    protected LRUCache<Long, NotificationCacheEntry> cache = new LRUCache(50);
    protected String topologyName;
    private transient NotificationTupleHandler notificationTupleHandler;
    private transient NotificationEntryCacheBuilder notificationEntryCacheBuilder;
    private transient BatchExecutor batchExecutor;

    public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) {
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        NotificationTuple notificationTuple = NotificationTuple.fromStormTuple(tuple);
        try {
            LOGGER.debug("{} Performing execute on tuple {}", (Object)((Object)((Object)this)).getClass().getName(), (Object)notificationTuple);
            this.prepareDiagnosticContext(notificationTuple);
            NotificationCacheEntry cachedCounters = this.readCachedCounters(notificationTuple);
            NotificationHandlerConfig notificationHandlerConfig = NotificationHandlerConfigBuilder.prepareNotificationHandlerConfig(notificationTuple, cachedCounters);
            this.notificationTupleHandler.handle(notificationTuple, notificationHandlerConfig);
            this.outputCollector.ack(tuple);
        }
        catch (RetryInterruptedException ex) {
            LOGGER.error("Notification interrupted: {}", (Object)ex.getMessage(), (Object)ex);
            this.outputCollector.fail(tuple);
        }
        catch (Exception ex) {
            LOGGER.error("Cannot store notification to Cassandra because: {}", (Object)ex.getMessage(), (Object)ex);
            this.batchExecutor.executeAll(this.notificationTupleHandler.prepareStatementsForTupleContainingLastRecord(notificationTuple, TaskState.DROPPED, ex.getMessage()));
            this.outputCollector.ack(tuple);
        }
        finally {
            this.clearDiagnosticContext();
        }
    }

    public void prepare(Map stormConf, TopologyContext tc, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider((String)this.hosts, (int)this.port, (String)this.keyspaceName, (String)this.userName, (String)this.password);
        CassandraTaskInfoDAO taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        NotificationsDAO subTaskInfoDAO = NotificationsDAO.getInstance(cassandraConnectionProvider);
        ProcessedRecordsDAO processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
        CassandraTaskErrorsDAO taskErrorDAO = CassandraTaskErrorsDAO.getInstance(cassandraConnectionProvider);
        TasksByStateDAO tasksByStateDAO = TasksByStateDAO.getInstance(cassandraConnectionProvider);
        this.notificationEntryCacheBuilder = new NotificationEntryCacheBuilder(subTaskInfoDAO, taskInfoDAO, taskErrorDAO);
        this.batchExecutor = BatchExecutor.getInstance((CassandraConnectionProvider)cassandraConnectionProvider);
        this.topologyName = (String)stormConf.get("topology.name");
        this.notificationTupleHandler = new NotificationTupleHandler(processedRecordsDAO, TaskDiagnosticInfoDAO.getInstance(cassandraConnectionProvider), NotificationsDAO.getInstance(cassandraConnectionProvider), taskErrorDAO, taskInfoDAO, tasksByStateDAO, this.batchExecutor, this.topologyName);
    }

    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    private NotificationCacheEntry readCachedCounters(NotificationTuple notificationTuple) {
        NotificationCacheEntry cachedCounters = (NotificationCacheEntry)this.cache.get((Object)notificationTuple.getTaskId());
        if (cachedCounters == null) {
            cachedCounters = this.notificationEntryCacheBuilder.build(notificationTuple.getTaskId());
            this.cache.put((Object)notificationTuple.getTaskId(), (Object)cachedCounters);
        } else {
            cachedCounters = this.updateExpectedRecordsNumberIfNeeded(cachedCounters, notificationTuple.getTaskId());
            this.cache.put((Object)notificationTuple.getTaskId(), (Object)cachedCounters);
        }
        return cachedCounters;
    }

    private NotificationCacheEntry updateExpectedRecordsNumberIfNeeded(NotificationCacheEntry cachedCounters, long taskId) {
        if (cachedCounters.getExpectedRecordsNumber() == -1) {
            return this.notificationEntryCacheBuilder.build(taskId);
        }
        return cachedCounters;
    }

    private void prepareDiagnosticContext(NotificationTuple stormTaskTuple) {
        DiagnosticContextWrapper.putValuesFrom(stormTaskTuple);
    }

    private void clearDiagnosticContext() {
        DiagnosticContextWrapper.clear();
    }
}

