/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.MalformedURLException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractDpsBolt
extends BaseRichBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDpsBolt.class);
    protected static volatile TaskStatusChecker taskStatusChecker;
    public static final String NOTIFICATION_STREAM_NAME = "NotificationStream";
    protected static final String AUTHORIZATION = "Authorization";
    public static final int DEFAULT_RETRIES = 3;
    public static final int SLEEP_TIME = 5000;
    protected Map stormConfig;
    protected TopologyContext topologyContext;
    protected OutputCollector outputCollector;
    protected String topologyName;

    public abstract void execute(StormTaskTuple var1);

    public abstract void prepare();

    public void execute(Tuple tuple) {
        block3: {
            StormTaskTuple t = null;
            try {
                t = StormTaskTuple.fromStormTuple(tuple);
                if (!taskStatusChecker.hasKillFlag(t.getTaskId())) {
                    LOGGER.info("Mapped to StormTaskTuple with taskId {} and parameters list : {}", (Object)t.getTaskId(), (Object)t.getParameters());
                    this.execute(t);
                }
            }
            catch (Exception e) {
                LOGGER.info("AbstractDpsBolt error: {} \nStackTrace: \n{}", (Object)e.getMessage(), (Object)e.getStackTrace());
                if (t == null) break block3;
                StringWriter stack = new StringWriter();
                e.printStackTrace(new PrintWriter(stack));
                this.emitErrorNotification(t.getTaskId(), t.getFileUrl(), e.getMessage(), stack.toString());
            }
        }
    }

    public void prepare(Map stormConfig, TopologyContext tc, OutputCollector oc) {
        this.stormConfig = stormConfig;
        this.topologyContext = tc;
        this.outputCollector = oc;
        this.topologyName = (String)stormConfig.get("topology.name");
        this.initTaskStatusChecker();
        this.prepare();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void initTaskStatusChecker() {
        String hosts = (String)this.stormConfig.get("CASSANDRA_HOSTS");
        int port = Integer.parseInt((String)this.stormConfig.get("CASSANDRA_PORT"));
        String keyspaceName = (String)this.stormConfig.get("CASSANDRA_KEYSPACE_NAME");
        String userName = (String)this.stormConfig.get("CASSANDRA_USERNAME");
        String password = (String)this.stormConfig.get("CASSANDRA_PASSWORD");
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider(hosts, port, keyspaceName, userName, password);
        Class<AbstractDpsBolt> clazz = AbstractDpsBolt.class;
        synchronized (AbstractDpsBolt.class) {
            if (taskStatusChecker == null) {
                try {
                    TaskStatusChecker.init(cassandraConnectionProvider);
                }
                catch (IllegalStateException e) {
                    LOGGER.info("It was already initialized Before");
                }
                taskStatusChecker = TaskStatusChecker.getTaskStatusChecker();
            }
            // ** MonitorExit[var7_7] (shouldn't be in output)
            return;
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream(NOTIFICATION_STREAM_NAME, NotificationTuple.getFields());
    }

    protected void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, (List)nt.toStormTuple());
    }

    protected void emitSuccessNotification(long taskId, String resource, String message, String additionalInformation, String resultResource, String unifiedErrorMessage, String detailedErrorMessage) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.SUCCESS, message, additionalInformation, resultResource);
        nt.addParameter("UNIFIED_ERROR_MESSAGE", unifiedErrorMessage);
        nt.addParameter("EXCEPTION_ERROR_MESSAGE", detailedErrorMessage);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, (List)nt.toStormTuple());
    }

    protected void logAndEmitError(StormTaskTuple t, String message) {
        LOGGER.error(message);
        this.emitErrorNotification(t.getTaskId(), t.getFileUrl(), message, t.getParameters().toString());
    }

    protected void emitSuccessNotification(long taskId, String resource, String message, String additionalInformation, String resultResource) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.SUCCESS, message, additionalInformation, resultResource);
        this.outputCollector.emit(NOTIFICATION_STREAM_NAME, (List)nt.toStormTuple());
    }

    protected void prepareStormTaskTupleForEmission(StormTaskTuple stormTaskTuple, String resultString) throws MalformedURLException {
        stormTaskTuple.setFileData(resultString.getBytes(Charset.forName("UTF-8")));
        UrlParser urlParser = new UrlParser(stormTaskTuple.getFileUrl());
        stormTaskTuple.addParameter("CLOUD_ID", urlParser.getPart(UrlPart.RECORDS));
        stormTaskTuple.addParameter("REPRESENTATION_NAME", urlParser.getPart(UrlPart.REPRESENTATIONS));
        stormTaskTuple.addParameter("REPRESENTATION_VERSION", urlParser.getPart(UrlPart.VERSIONS));
    }

    protected void waitForSpecificTime() {
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOGGER.error(e1.getMessage());
        }
    }
}

