/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.notification.handler;

import com.datastax.driver.core.BoundStatement;
import eu.europeana.cloud.common.model.dps.ErrorNotification;
import eu.europeana.cloud.common.model.dps.Notification;
import eu.europeana.cloud.common.model.dps.ProcessedRecord;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.storm.BatchExecutor;
import eu.europeana.cloud.service.dps.storm.ErrorType;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraSubTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.TasksByStateDAO;
import eu.europeana.cloud.service.dps.storm.notification.NotificationCacheEntry;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfig;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationTupleHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationTupleHandler.class);
    protected final ProcessedRecordsDAO processedRecordsDAO;
    protected final TaskDiagnosticInfoDAO taskDiagnosticInfoDAO;
    protected final CassandraSubTaskInfoDAO subTaskInfoDAO;
    protected final CassandraTaskErrorsDAO taskErrorDAO;
    protected final CassandraTaskInfoDAO taskInfoDAO;
    protected final TasksByStateDAO tasksByStateDAO;
    protected final String topologyName;
    protected BatchExecutor batchExecutor;

    public NotificationTupleHandler(ProcessedRecordsDAO processedRecordsDAO, TaskDiagnosticInfoDAO taskDiagnosticInfoDAO, CassandraSubTaskInfoDAO subTaskInfoDAO, CassandraTaskErrorsDAO taskErrorDAO, CassandraTaskInfoDAO taskInfoDAO, TasksByStateDAO tasksByStateDAO, BatchExecutor batchExecutor, String topologyName) {
        this.processedRecordsDAO = processedRecordsDAO;
        this.taskDiagnosticInfoDAO = taskDiagnosticInfoDAO;
        this.subTaskInfoDAO = subTaskInfoDAO;
        this.taskErrorDAO = taskErrorDAO;
        this.taskInfoDAO = taskInfoDAO;
        this.tasksByStateDAO = tasksByStateDAO;
        this.batchExecutor = batchExecutor;
        this.topologyName = topologyName;
    }

    public void handle(NotificationTuple notificationTuple, NotificationHandlerConfig config) {
        LOGGER.debug("Executing notification handler");
        long taskId = notificationTuple.getTaskId();
        String recordId = String.valueOf(notificationTuple.getParameters().get("resource"));
        if (this.tupleShouldBeProcessed(taskId, recordId)) {
            config.getNotificationCacheEntry().incrementCounters(notificationTuple);
            Notification notification = this.prepareNotification(notificationTuple, config.getNotificationCacheEntry().getProcessed());
            ArrayList<BoundStatement> statementsToBeExecutedInBatch = new ArrayList<BoundStatement>();
            statementsToBeExecutedInBatch.addAll(this.prepareCommonStatementsForAllTuples(notification, config.getNotificationCacheEntry()));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForTupleContainingLastRecord(notificationTuple, config));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForErrors(notificationTuple, config.getNotificationCacheEntry()));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForRecordState(notificationTuple, config));
            this.batchExecutor.executeAll(statementsToBeExecutedInBatch);
        }
        this.taskDiagnosticInfoDAO.updateLastRecordFinishedOnStormTime(notificationTuple.getTaskId(), Instant.now());
    }

    private boolean isFinished(ProcessedRecord theRecord) {
        return theRecord.getState() == RecordState.SUCCESS || theRecord.getState() == RecordState.ERROR;
    }

    private Notification prepareNotification(NotificationTuple notificationTuple, int resourceNum) {
        Map<String, Object> parameters = notificationTuple.getParameters();
        return Notification.builder().taskId(notificationTuple.getTaskId()).resourceNum(resourceNum).topologyName(this.topologyName).resource(String.valueOf(parameters.get("resource"))).state(String.valueOf(parameters.get("state"))).infoText(String.valueOf(parameters.get("info_text"))).additionalInformation(this.prepareAdditionalInfo(parameters)).resultResource(String.valueOf(parameters.get("resultResource"))).build();
    }

    private boolean tupleShouldBeProcessed(long taskId, String recordId) {
        Optional<ProcessedRecord> theRecord = this.processedRecordsDAO.selectByPrimaryKey(taskId, recordId);
        return theRecord.isEmpty() || !this.isFinished(theRecord.get());
    }

    private List<BoundStatement> prepareCommonStatementsForAllTuples(Notification notification, NotificationCacheEntry nCache) {
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        statementsToBeExecuted.add(this.subTaskInfoDAO.insertNotificationStatement(notification));
        statementsToBeExecuted.add(this.taskInfoDAO.updateProcessedFilesStatement(notification.getTaskId(), nCache.getProcessedRecordsCount(), nCache.getIgnoredRecordsCount(), nCache.getDeletedRecordsCount(), nCache.getProcessedErrorsCount(), nCache.getDeletedErrorsCount()));
        statementsToBeExecuted.add(this.taskDiagnosticInfoDAO.updateLastRecordFinishedOnStormTimeStatement(notification.getTaskId(), Instant.now()));
        return statementsToBeExecuted;
    }

    private boolean isError(NotificationTuple notificationTuple) {
        return this.isErrorTuple(notificationTuple) || notificationTuple.getParameter("UNIFIED_ERROR_MESSAGE") != null;
    }

    private List<BoundStatement> prepareStatementsForErrors(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        if (!this.isError(notificationTuple)) {
            return Collections.emptyList();
        }
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        ErrorNotification errorNotification = this.prepareErrorNotification(notificationTuple, nCache);
        ErrorType errorType = nCache.getErrorType(errorNotification.getErrorMessage());
        errorType.incrementCounter();
        statementsToBeExecuted.add(this.taskErrorDAO.insertErrorCounterStatement(notificationTuple.getTaskId(), errorType));
        if (!this.maximumNumberOfErrorsReached(errorType)) {
            statementsToBeExecuted.add(this.taskErrorDAO.insertErrorStatement(this.prepareErrorNotification(notificationTuple, nCache)));
        } else {
            LOGGER.warn("Will not store the error message because threshold reached for taskId={}. ", (Object)notificationTuple.getTaskId());
        }
        return statementsToBeExecuted;
    }

    private ErrorNotification prepareErrorNotification(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        String resource = String.valueOf(notificationTuple.getParameters().get("resource"));
        String errorMessage = String.valueOf(notificationTuple.getParameters().get("info_text"));
        String additionalInformation = String.valueOf(notificationTuple.getParameters().get("additionalInfo"));
        if (!this.isErrorTuple(notificationTuple) && notificationTuple.getParameters().get("UNIFIED_ERROR_MESSAGE") != null) {
            errorMessage = String.valueOf(notificationTuple.getParameters().get("UNIFIED_ERROR_MESSAGE"));
            additionalInformation = String.valueOf(notificationTuple.getParameters().get("EXCEPTION_ERROR_MESSAGE"));
        }
        return ErrorNotification.builder().taskId(notificationTuple.getTaskId()).errorType(nCache.getErrorType(errorMessage).getUuid()).errorMessage(errorMessage).resource(resource).additionalInformations(additionalInformation).build();
    }

    private List<BoundStatement> prepareStatementsForTupleContainingLastRecord(NotificationTuple notificationTuple, NotificationHandlerConfig config) {
        if (config.getTaskStateToBeSet().isPresent()) {
            return this.prepareStatementsForTupleContainingLastRecord(notificationTuple, config.getTaskStateToBeSet().get());
        }
        return Collections.emptyList();
    }

    private List<BoundStatement> prepareStatementsForRecordState(NotificationTuple notificationTuple, NotificationHandlerConfig config) {
        return this.prepareStatementsForRecordState(notificationTuple, config.getRecordStateToBeSet());
    }

    private List<BoundStatement> prepareStatementsForRecordState(NotificationTuple notificationTuple, RecordState recordState) {
        return Collections.singletonList(this.processedRecordsDAO.updateProcessedRecordStateStatement(notificationTuple.getTaskId(), String.valueOf(notificationTuple.getParameters().get("resource")), recordState));
    }

    public List<BoundStatement> prepareStatementsForTupleContainingLastRecord(NotificationTuple notificationTuple, TaskState newState, String message) {
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        this.taskInfoDAO.findById(notificationTuple.getTaskId()).flatMap(task -> this.tasksByStateDAO.findTask(task.getState(), this.topologyName, notificationTuple.getTaskId())).ifPresent(oldTaskState -> {
            statementsToBeExecuted.add(this.tasksByStateDAO.deleteStatement(oldTaskState.getState(), this.topologyName, notificationTuple.getTaskId()));
            statementsToBeExecuted.add(this.tasksByStateDAO.insertStatement(newState, this.topologyName, notificationTuple.getTaskId(), oldTaskState.getApplicationId(), oldTaskState.getTopicName(), oldTaskState.getStartTime()));
        });
        statementsToBeExecuted.add(this.taskInfoDAO.updateStateStatement(notificationTuple.getTaskId(), newState, message));
        return statementsToBeExecuted;
    }

    private List<BoundStatement> prepareStatementsForTupleContainingLastRecord(NotificationTuple notificationTuple, TaskState newState) {
        return this.prepareStatementsForTupleContainingLastRecord(notificationTuple, newState, newState.getDefaultMessage());
    }

    private String prepareAdditionalInfo(Map<String, Object> parameters) {
        String additionalInfo = String.valueOf(parameters.get("additionalInfo"));
        long now = Instant.now().toEpochMilli();
        long processingTime = now - (Long)parameters.get("START_TIME");
        return additionalInfo + " Processing time: " + processingTime;
    }

    private boolean maximumNumberOfErrorsReached(ErrorType errorType) {
        return errorType.getCount() > 100;
    }

    private boolean isErrorTuple(NotificationTuple notificationTuple) {
        return String.valueOf(notificationTuple.getParameters().get("state")).equalsIgnoreCase(RecordState.ERROR.toString());
    }
}

