/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.RecordState;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.commons.utils.RetryInterruptedException;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.utils.DiagnosticContextWrapper;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDpsBolt
extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDpsBolt.class);
    protected static final Logger STATISTICS_LOGGER = LoggerFactory.getLogger((String)"STATISTICS_LOGGER");
    protected static final String STATISTICS_LOGGER_MESSAGE_PATTERN = "[{}],{},{}";
    public static final String NOTIFICATION_STREAM_NAME = "NotificationStream";
    public static final int DEFAULT_RETRIES = 3;
    public static final int SLEEP_TIME = 5000;
    protected transient TaskStatusChecker taskStatusChecker;
    protected transient Map<?, ?> stormConfig;
    protected transient TopologyContext topologyContext;
    protected transient OutputCollector outputCollector;
    protected String topologyName;

    public abstract void execute(Tuple var1, StormTaskTuple var2);

    public abstract void prepare();

    protected boolean ignoreDeleted() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        StormTaskTuple stormTaskTuple = null;
        try {
            stormTaskTuple = StormTaskTuple.fromStormTuple(tuple);
            LOGGER.debug("{} Performing execute on tuple {}", (Object)((Object)((Object)this)).getClass().getName(), (Object)stormTaskTuple);
            this.prepareDiagnosticContext(stormTaskTuple);
            if (stormTaskTuple.getRecordAttemptNumber() > 1) {
                this.cleanInvalidData(stormTaskTuple);
            }
            if (this.taskStatusChecker.hasDroppedStatus(stormTaskTuple.getTaskId())) {
                this.outputCollector.fail(tuple);
                LOGGER.info("Interrupting execution cause task was dropped: {} recordId: {}", (Object)stormTaskTuple.getTaskId(), (Object)stormTaskTuple.getFileUrl());
                return;
            }
            if (this.ignoreDeleted() && stormTaskTuple.isMarkedAsDeleted()) {
                LOGGER.debug("Ignoring and passing further delete record with taskId {} and parameters list : {}", (Object)stormTaskTuple.getTaskId(), stormTaskTuple.getParameters());
                this.outputCollector.emit(tuple, (List)stormTaskTuple.toStormTuple());
                this.outputCollector.ack(tuple);
                return;
            }
            LOGGER.debug("{} Mapped to StormTaskTuple with taskId {} and parameters list : {}", new Object[]{((Object)((Object)this)).getClass().getName(), stormTaskTuple.getTaskId(), stormTaskTuple.getParameters()});
            this.execute(tuple, stormTaskTuple);
        }
        catch (RetryInterruptedException e) {
            this.handleInterruption(e, tuple);
        }
        catch (Exception e) {
            if (Thread.currentThread().isInterrupted()) {
                this.handleInterruptedFlag(e, tuple);
            } else {
                this.handleException(tuple, stormTaskTuple, e);
            }
        }
        finally {
            this.clearDiagnosticContext();
        }
    }

    private void handleException(Tuple tuple, StormTaskTuple stormTaskTuple, Exception e) {
        LOGGER.warn("{} error: {}", new Object[]{this.boltName(), e.getMessage(), e});
        if (stormTaskTuple != null) {
            StringWriter stack = new StringWriter();
            e.printStackTrace(new PrintWriter(stack));
            this.emitErrorNotification(tuple, stormTaskTuple, e.getMessage(), stack.toString());
            this.outputCollector.ack(tuple);
        }
    }

    protected void handleInterruptedFlag(Exception e, Tuple tuple) {
        LOGGER.error("{} thread was interrupted, and an exception caught: {}", new Object[]{this.boltName(), e.getMessage(), e});
        this.outputCollector.fail(tuple);
    }

    protected void handleInterruption(RetryInterruptedException e, Tuple tuple) {
        LOGGER.error("{} execution interrupted: {}", new Object[]{this.boltName(), e.getMessage(), e});
        this.outputCollector.fail(tuple);
    }

    private void prepareDiagnosticContext(StormTaskTuple stormTaskTuple) {
        DiagnosticContextWrapper.putValuesFrom(stormTaskTuple);
    }

    private void clearDiagnosticContext() {
        DiagnosticContextWrapper.clear();
    }

    public void prepare(Map stormConfig, TopologyContext tc, OutputCollector oc) {
        this.stormConfig = stormConfig;
        this.topologyContext = tc;
        this.outputCollector = oc;
        this.topologyName = (String)stormConfig.get("topology.name");
        this.initTaskStatusChecker();
        this.prepare();
    }

    private void initTaskStatusChecker() {
        String hosts = (String)this.stormConfig.get("CASSANDRA_HOSTS");
        int port = Integer.parseInt((String)this.stormConfig.get("CASSANDRA_PORT"));
        String keyspaceName = (String)this.stormConfig.get("CASSANDRA_KEYSPACE_NAME");
        String userName = (String)this.stormConfig.get("CASSANDRA_USERNAME");
        String password = (String)this.stormConfig.get("CASSANDRA_PASSWORD");
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider((String)hosts, (int)port, (String)keyspaceName, (String)userName, (String)password);
        this.taskStatusChecker = TaskStatusChecker.getTaskStatusChecker(cassandraConnectionProvider);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream(NOTIFICATION_STREAM_NAME, NotificationTuple.getFields());
    }

    protected void emitErrorNotification(Tuple anchorTuple, StormTaskTuple stormTaskTuple, String message, String additionalInformation) {
        NotificationTuple nt = NotificationTuple.prepareNotificationWithResultResource(stormTaskTuple, RecordState.ERROR, message, additionalInformation);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitSuccessNotification(Tuple anchorTuple, StormTaskTuple stormTaskTuple, String message, String additionalInformation, String unifiedErrorMessage, String detailedErrorMessage) {
        NotificationTuple nt = NotificationTuple.prepareNotificationWithResultResourceAndErrorMessage(stormTaskTuple, RecordState.SUCCESS, message, additionalInformation, unifiedErrorMessage, detailedErrorMessage);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitSuccessNotification(Tuple anchorTuple, StormTaskTuple stormTaskTuple, String message, String additionalInformation) {
        NotificationTuple nt = NotificationTuple.prepareNotificationWithResultResource(stormTaskTuple, RecordState.SUCCESS, message, additionalInformation);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)nt.toStormTuple());
    }

    protected void emitIgnoredNotification(Tuple anchorTuple, StormTaskTuple stormTaskTuple, String message, String additionalInformation) {
        NotificationTuple tuple = NotificationTuple.prepareNotificationWithResultResource(stormTaskTuple, RecordState.SUCCESS, message, additionalInformation);
        tuple.addParameter("IGNORED_RECORD", "true");
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, anchorTuple, (List)tuple.toStormTuple());
    }

    protected void prepareStormTaskTupleForEmission(StormTaskTuple stormTaskTuple, String resultString) throws MalformedURLException {
        stormTaskTuple.setFileData(resultString.getBytes(StandardCharsets.UTF_8));
        UrlParser urlParser = new UrlParser(stormTaskTuple.getFileUrl());
        stormTaskTuple.addParameter("CLOUD_ID", urlParser.getPart(UrlPart.RECORDS));
        stormTaskTuple.addParameter("REPRESENTATION_NAME", urlParser.getPart(UrlPart.REPRESENTATIONS));
        stormTaskTuple.addParameter("REPRESENTATION_VERSION", urlParser.getPart(UrlPart.VERSIONS));
    }

    protected void cleanInvalidData(StormTaskTuple tuple) {
        int attemptNumber = tuple.getRecordAttemptNumber();
        LOGGER.info("Attempt number {} to process this message. No cleaning done here.", (Object)attemptNumber);
    }

    protected void logStatistics(LogStatisticsPosition position, String opName, String opId) {
        STATISTICS_LOGGER.debug(STATISTICS_LOGGER_MESSAGE_PATTERN, new Object[]{position, opName, opId});
    }

    private String boltName() {
        return ((Object)((Object)this)).getClass().getSimpleName();
    }

    public static enum LogStatisticsPosition {
        BEGIN,
        END;

    }
}

