/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.metis.indexing.DataSetCleanerParameters;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.utils.DiagnosticContextWrapper;
import eu.europeana.cloud.service.dps.storm.utils.StormTaskTupleHelper;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDpsBolt
extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDpsBolt.class);
    public static final String NOTIFICATION_STREAM_NAME = "NotificationStream";
    protected static final String AUTHORIZATION = "Authorization";
    public static final int DEFAULT_RETRIES = 3;
    public static final int SLEEP_TIME = 5000;
    protected transient TaskStatusChecker taskStatusChecker;
    protected transient Map<?, ?> stormConfig;
    protected transient TopologyContext topologyContext;
    protected transient OutputCollector outputCollector;
    protected String topologyName;

    public abstract void execute(Tuple var1, StormTaskTuple var2);

    public abstract void prepare();

    protected boolean ignoreDeleted() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        StormTaskTuple stormTaskTuple = null;
        try {
            stormTaskTuple = StormTaskTuple.fromStormTuple(tuple);
            LOGGER.debug("{} Performing execute on tuple {}", (Object)((Object)((Object)this)).getClass().getName(), (Object)stormTaskTuple);
            this.prepareDiagnosticContext(stormTaskTuple);
            if (stormTaskTuple.getRecordAttemptNumber() > 1) {
                this.cleanInvalidData(stormTaskTuple);
            }
            if (this.taskStatusChecker.hasDroppedStatus(stormTaskTuple.getTaskId())) {
                this.outputCollector.fail(tuple);
                LOGGER.info("Interrupting execution cause task was dropped: {} recordId: {}", (Object)stormTaskTuple.getTaskId(), (Object)stormTaskTuple.getFileUrl());
                return;
            }
            if (this.ignoreDeleted() && stormTaskTuple.isMarkedAsDeleted()) {
                LOGGER.debug("Ingornigng and passing further delete record with taskId {} and parameters list : {}", (Object)stormTaskTuple.getTaskId(), (Object)stormTaskTuple.getParameters());
                this.outputCollector.emit(tuple, (List)stormTaskTuple.toStormTuple());
                this.outputCollector.ack(tuple);
                return;
            }
            LOGGER.debug("{} Mapped to StormTaskTuple with taskId {} and parameters list : {}", ((Object)((Object)this)).getClass().getName(), stormTaskTuple.getTaskId(), stormTaskTuple.getParameters());
            this.execute(tuple, stormTaskTuple);
        }
        catch (Exception e) {
            LOGGER.info("AbstractDpsBolt error: {}", (Object)e.getMessage(), (Object)e);
            if (stormTaskTuple != null) {
                StringWriter stack = new StringWriter();
                e.printStackTrace(new PrintWriter(stack));
                this.emitErrorNotification(tuple, stormTaskTuple.getTaskId(), stormTaskTuple.isMarkedAsDeleted(), stormTaskTuple.getFileUrl(), e.getMessage(), stack.toString(), StormTaskTupleHelper.getRecordProcessingStartTime(stormTaskTuple));
                this.outputCollector.ack(tuple);
            }
        }
        finally {
            this.clearDiagnosticContext();
        }
    }

    private void prepareDiagnosticContext(StormTaskTuple stormTaskTuple) {
        DiagnosticContextWrapper.putValuesFrom(stormTaskTuple);
    }

    private void clearDiagnosticContext() {
        DiagnosticContextWrapper.clear();
    }

    public void prepare(Map stormConfig, TopologyContext tc, OutputCollector oc) {
        this.stormConfig = stormConfig;
        this.topologyContext = tc;
        this.outputCollector = oc;
        this.topologyName = (String)stormConfig.get("topology.name");
        this.initTaskStatusChecker();
        this.prepare();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTaskStatusChecker() {
        String hosts = (String)this.stormConfig.get("CASSANDRA_HOSTS");
        int port = Integer.parseInt((String)this.stormConfig.get("CASSANDRA_PORT"));
        String keyspaceName = (String)this.stormConfig.get("CASSANDRA_KEYSPACE_NAME");
        String userName = (String)this.stormConfig.get("CASSANDRA_USERNAME");
        String password = (String)this.stormConfig.get("CASSANDRA_PASSWORD");
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(hosts, port, keyspaceName, userName, password);
        Class<AbstractDpsBolt> clazz = AbstractDpsBolt.class;
        synchronized (AbstractDpsBolt.class) {
            if (this.taskStatusChecker == null) {
                try {
                    TaskStatusChecker.init(cassandraConnectionProvider);
                }
                catch (IllegalStateException e) {
                    LOGGER.info("It was already initialized Before");
                }
                this.taskStatusChecker = TaskStatusChecker.getTaskStatusChecker();
            }
            // ** MonitorExit[var7_7] (shouldn't be in output)
            return;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream(NOTIFICATION_STREAM_NAME, NotificationTuple.getFields());
    }

    protected void emitErrorNotification(Tuple anchorTuple, long taskId, boolean markedAsDeleted, String resource, String message, String additionalInformations, long processingStartTime) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, markedAsDeleted, resource, RecordState.ERROR, message, additionalInformations, processingStartTime);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitSuccessNotification(Tuple anchorTuple, long taskId, boolean markedAsDelete, String resource, String message, String additionalInformation, String resultResource, String unifiedErrorMessage, String detailedErrorMessage, long processingStartTime) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, markedAsDelete, resource, RecordState.SUCCESS, message, additionalInformation, resultResource, processingStartTime);
        nt.addParameter("UNIFIED_ERROR_MESSAGE", unifiedErrorMessage);
        nt.addParameter("EXCEPTION_ERROR_MESSAGE", detailedErrorMessage);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitSuccessNotification(Tuple anchorTuple, long taskId, boolean markedAsDelete, String resource, String message, String additionalInformation, String resultResource, long processingStartTime) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, markedAsDelete, resource, RecordState.SUCCESS, message, additionalInformation, resultResource, processingStartTime);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitIgnoredNotification(Tuple anchorTuple, long taskId, boolean markedAsDeleted, String resource, String message, String additionalInformation, long processingStartTime) {
        NotificationTuple tuple = NotificationTuple.prepareNotification(taskId, markedAsDeleted, resource, RecordState.SUCCESS, message, additionalInformation, "", processingStartTime);
        tuple.addParameter("IGNORED_RECORD", "true");
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)tuple.toStormTuple());
    }

    protected void emitSuccessNotificationForIndexing(Tuple anchorTuple, long taskId, boolean markedAsDeleted, DataSetCleanerParameters dataSetCleanerParameters, String authenticationHeader, String resource, String message, String additionalInformation, String resultResource, long processingStartTime) {
        NotificationTuple nt = NotificationTuple.prepareIndexingNotification(taskId, markedAsDeleted, dataSetCleanerParameters, authenticationHeader, resource, RecordState.SUCCESS, message, additionalInformation, resultResource, processingStartTime);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void prepareStormTaskTupleForEmission(StormTaskTuple stormTaskTuple, String resultString) throws MalformedURLException {
        stormTaskTuple.setFileData(resultString.getBytes(StandardCharsets.UTF_8));
        UrlParser urlParser = new UrlParser(stormTaskTuple.getFileUrl());
        stormTaskTuple.addParameter("CLOUD_ID", urlParser.getPart(UrlPart.RECORDS));
        stormTaskTuple.addParameter("REPRESENTATION_NAME", urlParser.getPart(UrlPart.REPRESENTATIONS));
        stormTaskTuple.addParameter("REPRESENTATION_VERSION", urlParser.getPart(UrlPart.VERSIONS));
    }

    protected void waitForSpecificTime() {
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
            LOGGER.error(ie.getMessage());
        }
    }

    protected void cleanInvalidData(StormTaskTuple tuple) {
        int attemptNumber = tuple.getRecordAttemptNumber();
        LOGGER.info("Attempt number {} to process this message. No cleaning done here.", (Object)attemptNumber);
    }
}

