/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka;

import eu.europeana.cloud.cassandra.CassandraConnectionProvider;
import eu.europeana.cloud.cassandra.CassandraConnectionProviderSingleton;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import java.util.Map;
import org.apache.storm.kafka.KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomKafkaSpout
extends KafkaSpout {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(CustomKafkaSpout.class);
    protected static volatile TaskStatusChecker taskStatusChecker;
    private String hosts;
    private int port;
    private String keyspaceName;
    private String userName;
    private String password;
    protected CassandraTaskInfoDAO cassandraTaskInfoDAO;

    protected CustomKafkaSpout(SpoutConfig spoutConf) {
        super(spoutConf);
    }

    public CustomKafkaSpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password) {
        super(spoutConf);
        this.hosts = hosts;
        this.port = port;
        this.keyspaceName = keyspaceName;
        this.userName = userName;
        this.password = password;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        LOGGER.info("Custom spout opened");
        super.open(conf, context, collector);
        CassandraConnectionProvider cassandraConnectionProvider = CassandraConnectionProviderSingleton.getCassandraConnectionProvider((String)this.hosts, (int)this.port, (String)this.keyspaceName, (String)this.userName, (String)this.password);
        this.cassandraTaskInfoDAO = CassandraTaskInfoDAO.getInstance(cassandraConnectionProvider);
        Class<CustomKafkaSpout> clazz = CustomKafkaSpout.class;
        synchronized (CustomKafkaSpout.class) {
            if (taskStatusChecker == null) {
                try {
                    TaskStatusChecker.init(cassandraConnectionProvider);
                }
                catch (IllegalStateException e) {
                    LOGGER.info("It was already initialized");
                }
                taskStatusChecker = TaskStatusChecker.getTaskStatusChecker();
            }
            // ** MonitorExit[var5_5] (shouldn't be in output)
            return;
        }
    }

    public void ack(Object msgId) {
        super.ack(msgId);
    }

    public void fail(Object msgId) {
        super.ack(msgId);
    }
}

