/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.notification.handler;

import com.datastax.driver.core.BoundStatement;
import eu.europeana.cloud.common.model.dps.ErrorNotification;
import eu.europeana.cloud.common.model.dps.Notification;
import eu.europeana.cloud.common.model.dps.ProcessedRecord;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.commons.utils.BatchExecutor;
import eu.europeana.cloud.service.dps.storm.ErrorType;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskErrorsDAO;
import eu.europeana.cloud.service.dps.storm.dao.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.NotificationsDAO;
import eu.europeana.cloud.service.dps.storm.dao.ProcessedRecordsDAO;
import eu.europeana.cloud.service.dps.storm.dao.TaskDiagnosticInfoDAO;
import eu.europeana.cloud.service.dps.storm.dao.TasksByStateDAO;
import eu.europeana.cloud.service.dps.storm.notification.NotificationCacheEntry;
import eu.europeana.cloud.service.dps.storm.notification.handler.NotificationHandlerConfig;
import eu.europeana.enrichment.rest.client.report.Report;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationTupleHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(NotificationTupleHandler.class);
    public static final int MAX_STACKTRACE_LENGTH = 6000;
    protected final ProcessedRecordsDAO processedRecordsDAO;
    protected final TaskDiagnosticInfoDAO taskDiagnosticInfoDAO;
    protected final NotificationsDAO subTaskInfoDAO;
    protected final CassandraTaskErrorsDAO taskErrorDAO;
    protected final CassandraTaskInfoDAO taskInfoDAO;
    protected final TasksByStateDAO tasksByStateDAO;
    protected final String topologyName;
    protected BatchExecutor batchExecutor;

    public NotificationTupleHandler(ProcessedRecordsDAO processedRecordsDAO, TaskDiagnosticInfoDAO taskDiagnosticInfoDAO, NotificationsDAO subTaskInfoDAO, CassandraTaskErrorsDAO taskErrorDAO, CassandraTaskInfoDAO taskInfoDAO, TasksByStateDAO tasksByStateDAO, BatchExecutor batchExecutor, String topologyName) {
        this.processedRecordsDAO = processedRecordsDAO;
        this.taskDiagnosticInfoDAO = taskDiagnosticInfoDAO;
        this.subTaskInfoDAO = subTaskInfoDAO;
        this.taskErrorDAO = taskErrorDAO;
        this.taskInfoDAO = taskInfoDAO;
        this.tasksByStateDAO = tasksByStateDAO;
        this.batchExecutor = batchExecutor;
        this.topologyName = topologyName;
    }

    public void handle(NotificationTuple notificationTuple, NotificationHandlerConfig config) {
        LOGGER.debug("Executing notification handler");
        long taskId = notificationTuple.getTaskId();
        String resource = String.valueOf(notificationTuple.getParameters().get("RESOURCE"));
        if (this.tupleShouldBeProcessed(taskId, resource)) {
            config.getNotificationCacheEntry().incrementCounters(notificationTuple);
            Notification notification = this.prepareNotification(notificationTuple, config.getNotificationCacheEntry().getProcessed());
            ArrayList<BoundStatement> statementsToBeExecutedInBatch = new ArrayList<BoundStatement>();
            statementsToBeExecutedInBatch.addAll(this.prepareCommonStatementsForAllTuples(notification, config.getNotificationCacheEntry()));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForErrors(notificationTuple, config.getNotificationCacheEntry()));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForReports(notificationTuple, config.getNotificationCacheEntry()));
            statementsToBeExecutedInBatch.addAll(this.prepareStatementsForRecordState(notificationTuple, config));
            this.batchExecutor.executeAll(statementsToBeExecutedInBatch);
        }
        this.taskDiagnosticInfoDAO.updateLastRecordFinishedOnStormTime(notificationTuple.getTaskId(), Instant.now());
    }

    private boolean isFinished(ProcessedRecord theRecord) {
        return theRecord.getState() == RecordState.SUCCESS || theRecord.getState() == RecordState.ERROR;
    }

    private Notification prepareNotification(NotificationTuple notificationTuple, int resourceNum) {
        Map<String, Object> parameters = notificationTuple.getParameters();
        return Notification.builder().taskId(notificationTuple.getTaskId()).resourceNum(resourceNum).topologyName(this.topologyName).resource(String.valueOf(parameters.get("RESOURCE"))).state(String.valueOf(parameters.get("STATE"))).infoText(String.valueOf(parameters.get("INFO_TEXT"))).additionalInformation(this.prepareAdditionalInfo(parameters)).resultResource(String.valueOf(parameters.get("RESULT_RESOURCE"))).build();
    }

    private boolean tupleShouldBeProcessed(long taskId, String recordId) {
        Optional<ProcessedRecord> theRecord = this.processedRecordsDAO.selectByPrimaryKey(taskId, recordId);
        return theRecord.isEmpty() || !this.isFinished(theRecord.get());
    }

    private List<BoundStatement> prepareCommonStatementsForAllTuples(Notification notification, NotificationCacheEntry nCache) {
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        statementsToBeExecuted.add(this.subTaskInfoDAO.insertNotificationStatement(notification));
        statementsToBeExecuted.add(this.taskInfoDAO.updateProcessedFilesStatement(notification.getTaskId(), nCache.getProcessedRecordsCount(), nCache.getIgnoredRecordsCount(), nCache.getDeletedRecordsCount(), nCache.getProcessedErrorsCount(), nCache.getDeletedErrorsCount()));
        statementsToBeExecuted.add(this.taskDiagnosticInfoDAO.updateLastRecordFinishedOnStormTimeStatement(notification.getTaskId(), Instant.now()));
        return statementsToBeExecuted;
    }

    private boolean isError(NotificationTuple notificationTuple) {
        return this.isErrorTuple(notificationTuple) || notificationTuple.getParameter("UNIFIED_ERROR_MESSAGE") != null;
    }

    private boolean isReportPresent(NotificationTuple notificationTuple) {
        return !notificationTuple.getReportSet().isEmpty();
    }

    private List<BoundStatement> prepareStatementsForErrors(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        if (this.isError(notificationTuple)) {
            ErrorNotification errorNotification = this.prepareErrorNotificationFromTuple(notificationTuple, nCache);
            return this.getStatementsToBeExecutedFromErrorNotification(notificationTuple, nCache, errorNotification);
        }
        return Collections.emptyList();
    }

    private List<BoundStatement> prepareStatementsForReports(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        if (this.isReportPresent(notificationTuple)) {
            List<ErrorNotification> errorNotifications = this.prepareErrorNotificationsFromTupleReports(notificationTuple, nCache);
            errorNotifications.forEach(errorNotification -> statementsToBeExecuted.addAll(this.getStatementsToBeExecutedFromErrorNotification(notificationTuple, nCache, (ErrorNotification)errorNotification)));
        }
        return statementsToBeExecuted;
    }

    private List<BoundStatement> getStatementsToBeExecutedFromErrorNotification(NotificationTuple notificationTuple, NotificationCacheEntry nCache, ErrorNotification errorNotification) {
        ErrorType errorType = nCache.getErrorType(errorNotification.getErrorMessage());
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        errorType.incrementCounter();
        statementsToBeExecuted.add(this.taskErrorDAO.insertErrorCounterStatement(notificationTuple.getTaskId(), errorType));
        if (!this.maximumNumberOfErrorsReached(errorType)) {
            statementsToBeExecuted.add(this.taskErrorDAO.insertErrorStatement(errorNotification));
        } else {
            LOGGER.warn("Will not store the error message because threshold reached for taskId={}. ", (Object)notificationTuple.getTaskId());
        }
        return statementsToBeExecuted;
    }

    private List<ErrorNotification> prepareErrorNotificationsFromTupleReports(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        ArrayList<ErrorNotification> errorNotifications = new ArrayList<ErrorNotification>();
        notificationTuple.getReportSet().forEach(report -> errorNotifications.add(this.prepareErrorNotificationFromReport(notificationTuple, (Report)report, nCache)));
        return errorNotifications;
    }

    private ErrorNotification prepareErrorNotificationFromReport(NotificationTuple notificationTuple, Report report, NotificationCacheEntry nCache) {
        String resource = String.valueOf(notificationTuple.getParameters().get("RESOURCE"));
        String errorMessage = String.format("%s", report.getMessage());
        String additionalInformation = String.format("MessageType:%s; ProcessingMode:%s; HTTPStatus:%s; Value:%s; StackTrace:%s", report.getMessageType(), report.getMode(), report.getStatus(), report.getValue(), report.getStackTrace());
        return ErrorNotification.builder().taskId(notificationTuple.getTaskId()).errorType(nCache.getErrorType(errorMessage).getUuid()).errorMessage(errorMessage).resource(resource).additionalInformations(this.prepareAdditionalInformation(additionalInformation)).build();
    }

    private ErrorNotification prepareErrorNotificationFromTuple(NotificationTuple notificationTuple, NotificationCacheEntry nCache) {
        String resource = String.valueOf(notificationTuple.getParameters().get("RESOURCE"));
        String errorMessage = String.valueOf(notificationTuple.getParameters().get("INFO_TEXT"));
        String additionalInformation = String.valueOf(notificationTuple.getParameters().get("STATE_DESCRIPTION"));
        if (!this.isErrorTuple(notificationTuple) && notificationTuple.getParameters().get("UNIFIED_ERROR_MESSAGE") != null) {
            errorMessage = String.valueOf(notificationTuple.getParameters().get("UNIFIED_ERROR_MESSAGE"));
            additionalInformation = String.valueOf(notificationTuple.getParameters().get("EXCEPTION_ERROR_MESSAGE"));
        }
        return ErrorNotification.builder().taskId(notificationTuple.getTaskId()).errorType(nCache.getErrorType(errorMessage).getUuid()).errorMessage(errorMessage).resource(resource).additionalInformations(this.prepareAdditionalInformation(additionalInformation)).build();
    }

    private String prepareAdditionalInformation(String additionalInformation) {
        if (additionalInformation == null) {
            return "";
        }
        return additionalInformation.substring(0, Math.min(6000, additionalInformation.length()));
    }

    private List<BoundStatement> prepareStatementsForRecordState(NotificationTuple notificationTuple, NotificationHandlerConfig config) {
        return this.prepareStatementsForRecordState(notificationTuple, config.getRecordStateToBeSet());
    }

    private List<BoundStatement> prepareStatementsForRecordState(NotificationTuple notificationTuple, RecordState recordState) {
        return Collections.singletonList(this.processedRecordsDAO.updateProcessedRecordStateStatement(notificationTuple.getTaskId(), notificationTuple.getResource(), recordState));
    }

    public List<BoundStatement> prepareStatementsForTupleContainingLastRecord(NotificationTuple notificationTuple, TaskState newState, String message) {
        ArrayList<BoundStatement> statementsToBeExecuted = new ArrayList<BoundStatement>();
        this.taskInfoDAO.findById(notificationTuple.getTaskId()).flatMap(task -> this.tasksByStateDAO.findTask(task.getState(), this.topologyName, notificationTuple.getTaskId())).ifPresent(oldTaskState -> {
            statementsToBeExecuted.add(this.tasksByStateDAO.deleteStatement(oldTaskState.getState(), this.topologyName, notificationTuple.getTaskId()));
            statementsToBeExecuted.add(this.tasksByStateDAO.insertStatement(newState, this.topologyName, notificationTuple.getTaskId(), oldTaskState.getApplicationId(), oldTaskState.getTopicName(), oldTaskState.getStartTime()));
        });
        statementsToBeExecuted.add(this.taskInfoDAO.updateStateStatement(notificationTuple.getTaskId(), newState, message));
        return statementsToBeExecuted;
    }

    private Map<String, String> prepareAdditionalInfo(Map<String, Object> parameters) {
        long processingTime = Instant.now().toEpochMilli() - (Long)parameters.get("START_TIME");
        return Map.of("stateDescription", String.valueOf(parameters.get("STATE_DESCRIPTION")), "processingTime", String.valueOf(processingTime), "europeanaId", String.valueOf(parameters.get("EUROPEANA_ID")));
    }

    private boolean maximumNumberOfErrorsReached(ErrorType errorType) {
        return errorType.getCount() > 100;
    }

    private boolean isErrorTuple(NotificationTuple notificationTuple) {
        return String.valueOf(notificationTuple.getParameters().get("STATE")).equalsIgnoreCase(RecordState.ERROR.toString());
    }
}

