/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.TaskInfo;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.storm.BatchExecutor;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraSubTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.TasksByStateDAO;
import eu.europeana.cloud.service.dps.storm.notification.NotificationCacheEntry;
import eu.europeana.cloud.service.dps.storm.notification.NotificationEntryCacheBuilder;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfig;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfigBuilder;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationTupleHandler;
import eu.europeana.cloud.service.dps.util.LRUCache;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationBolt
extends BaseRichBolt {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationBolt.class);
    private final String hosts;
    private final int port;
    private final String keyspaceName;
    private final String userName;
    private final String password;
    protected transient OutputCollector outputCollector;
    protected LRUCache<Long, NotificationCacheEntry> cache = new LRUCache(50);
    protected String topologyName;
    private transient CassandraTaskInfoDAO taskInfoDAO;
    private transient NotificationTupleHandler notificationTupleHandler;
    private transient NotificationEntryCacheBuilder notificationEntryCacheBuilder;
    private transient BatchExecutor batchExecutor;

    public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) {
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        NotificationTuple notificationTuple = NotificationTuple.fromStormTuple(tuple);
        try {
            NotificationCacheEntry cachedCounters = this.readCachedCounters(notificationTuple);
            NotificationHandlerConfig notificationHandlerConfig = NotificationHandlerConfigBuilder.prepareNotificationHandlerConfig(notificationTuple, cachedCounters, this.needsPostProcessing(notificationTuple));
            this.notificationTupleHandler.handle(notificationTuple, notificationHandlerConfig);
        }
        catch (Exception ex) {
            LOGGER.error("Cannot store notification to Cassandra because: {}", (Object)ex.getMessage(), (Object)ex);
            this.batchExecutor.executeAll(this.notificationTupleHandler.prepareStatementsForTupleContainingLastRecord(notificationTuple, TaskState.DROPPED, ex.getMessage()));
        }
        finally {
            this.outputCollector.ack(tuple);
        }
    }

    public void prepare(Map stormConf, TopologyContext tc, OutputCollector outputCollector) {
        this.outputCollector = outputCollector;
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(this.hosts, this.port, this.keyspaceName, this.userName, this.password);
        this.taskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        CassandraSubTaskInfoDAO subTaskInfoDAO = CassandraSubTaskInfoDAO.getInstance(cassandraConnectionProvider);
        ProcessedRecordsDAO processedRecordsDAO = ProcessedRecordsDAO.getInstance(cassandraConnectionProvider);
        CassandraTaskErrorsDAO taskErrorDAO = CassandraTaskErrorsDAO.getInstance(cassandraConnectionProvider);
        TasksByStateDAO tasksByStateDAO = TasksByStateDAO.getInstance(cassandraConnectionProvider);
        this.notificationEntryCacheBuilder = new NotificationEntryCacheBuilder(subTaskInfoDAO, this.taskInfoDAO, taskErrorDAO);
        this.batchExecutor = BatchExecutor.getInstance(cassandraConnectionProvider);
        this.topologyName = (String)stormConf.get("topology.name");
        this.notificationTupleHandler = new NotificationTupleHandler(processedRecordsDAO, TaskDiagnosticInfoDAO.getInstance(cassandraConnectionProvider), CassandraSubTaskInfoDAO.getInstance(cassandraConnectionProvider), taskErrorDAO, this.taskInfoDAO, tasksByStateDAO, this.batchExecutor, this.topologyName);
    }

    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    protected boolean needsPostProcessing(NotificationTuple tuple) throws TaskInfoDoesNotExistException, IOException {
        return false;
    }

    protected DpsTask loadDpsTask(NotificationTuple tuple) throws TaskInfoDoesNotExistException, IOException {
        Optional<TaskInfo> taskInfo = this.taskInfoDAO.findById(tuple.getTaskId());
        String taskDefinition = taskInfo.orElseThrow(TaskInfoDoesNotExistException::new).getDefinition();
        return new ObjectMapper().readValue(taskDefinition, DpsTask.class);
    }

    private NotificationCacheEntry readCachedCounters(NotificationTuple notificationTuple) {
        NotificationCacheEntry cachedCounters = this.cache.get(notificationTuple.getTaskId());
        if (cachedCounters == null) {
            cachedCounters = this.notificationEntryCacheBuilder.build(notificationTuple.getTaskId());
            this.cache.put(notificationTuple.getTaskId(), cachedCounters);
        } else {
            cachedCounters = this.updateExpectedRecordsNumberIfNeeded(cachedCounters, notificationTuple.getTaskId());
            this.cache.put(notificationTuple.getTaskId(), cachedCounters);
        }
        return cachedCounters;
    }

    private NotificationCacheEntry updateExpectedRecordsNumberIfNeeded(NotificationCacheEntry cachedCounters, long taskId) {
        if (cachedCounters.getExpectedRecordsNumber() == -1) {
            return this.notificationEntryCacheBuilder.build(taskId);
        }
        return cachedCounters;
    }
}

