/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.topologies.oaipmh.spout;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CustomKafkaSpout;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.utils.TaskSpoutInfo;
import eu.europeana.cloud.service.dps.storm.topologies.oaipmh.helpers.SourceProvider;
import eu.europeana.cloud.service.dps.storm.topologies.oaipmh.spout.schema.SchemaFactory;
import eu.europeana.cloud.service.dps.storm.topologies.oaipmh.spout.schema.SchemaHandler;
import java.io.IOException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.codehaus.jackson.map.ObjectMapper;
import org.dspace.xoai.model.oaipmh.Header;
import org.dspace.xoai.serviceprovider.exceptions.BadArgumentException;
import org.dspace.xoai.serviceprovider.parameters.ListIdentifiersParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OAISpout
extends CustomKafkaSpout {
    private SpoutOutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(OAISpout.class);
    private SourceProvider sourceProvider;
    public static final int DEFAULT_RETRIES = 10;
    public static final int SLEEP_TIME = 5000;
    private transient ConcurrentHashMap<Long, TaskSpoutInfo> cache;

    public OAISpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.cache = new ConcurrentHashMap(50);
        this.sourceProvider = new SourceProvider();
        super.open(conf, context, new CollectorWrapper((ISpoutOutputCollector)collector));
    }

    @Override
    public void nextTuple() {
        block4: {
            DpsTask dpsTask = null;
            try {
                super.nextTuple();
                Iterator iterator2 = ((ConcurrentHashMap.KeySetView)this.cache.keySet()).iterator();
                while (iterator2.hasNext()) {
                    long taskId = (Long)iterator2.next();
                    TaskSpoutInfo currentTask = this.cache.get(taskId);
                    if (currentTask.isStarted()) continue;
                    LOGGER.info("Start progressing for Task with id {}", (Object)currentTask.getDpsTask().getTaskId());
                    this.startProgress(currentTask);
                    dpsTask = currentTask.getDpsTask();
                    OAIPMHHarvestingDetails oaipmhHarvestingDetails = dpsTask.getHarvestingDetails();
                    if (oaipmhHarvestingDetails == null) {
                        oaipmhHarvestingDetails = new OAIPMHHarvestingDetails();
                    }
                    StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), dpsTask.getDataEntry(InputDataType.REPOSITORY_URLS).get(0), null, dpsTask.getParameters(), dpsTask.getOutputRevision(), oaipmhHarvestingDetails);
                    this.execute(stormTaskTuple);
                    this.cache.remove(taskId);
                }
            }
            catch (Exception e) {
                LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                if (dpsTask == null) break block4;
                this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
            }
        }
    }

    private void startProgress(TaskSpoutInfo taskInfo) {
        taskInfo.startTheTask();
        DpsTask task = taskInfo.getDpsTask();
        this.cassandraTaskInfoDAO.updateTask(task.getTaskId(), "", String.valueOf((Object)TaskState.CURRENTLY_PROCESSING), new Date());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream("NotificationStream", NotificationTuple.getFields());
    }

    public void execute(StormTaskTuple stormTaskTuple) {
        try {
            SchemaHandler schemaHandler = SchemaFactory.getSchemaHandler(stormTaskTuple);
            Set<String> schemas = schemaHandler.getSchemas(stormTaskTuple);
            OAIPMHHarvestingDetails oaipmhHarvestingDetails = stormTaskTuple.getSourceDetails();
            int count2 = 0;
            Date fromDate = oaipmhHarvestingDetails.getDateFrom();
            Date untilDate = oaipmhHarvestingDetails.getDateUntil();
            Set<String> sets = oaipmhHarvestingDetails.getSets();
            for (String schema : schemas) {
                if (sets == null || sets.isEmpty()) {
                    count2 += this.harvestIdentifiers(schema, null, fromDate, untilDate, stormTaskTuple);
                    continue;
                }
                for (String set : sets) {
                    count2 += this.harvestIdentifiers(schema, set, fromDate, untilDate, stormTaskTuple);
                }
            }
            LOGGER.debug("Harvested {} identifiers for source ( {} )", (Object)count2, (Object)stormTaskTuple.getSourceDetails());
            this.cache.get(stormTaskTuple.getTaskId()).setFileCount(count2);
            this.cassandraTaskInfoDAO.setUpdateExpectedSize(stormTaskTuple.getTaskId(), count2);
        }
        catch (BadArgumentException e) {
            LOGGER.error("OAI Harvesting Spout error: {} ", (Object)e.getMessage());
            this.emitErrorNotification(stormTaskTuple.getTaskId(), stormTaskTuple.getFileUrl(), "Error while Harvesting identifiers " + e.getMessage(), "");
        }
    }

    private int harvestIdentifiers(String schema, String dataset, Date fromDate, Date untilDate, StormTaskTuple stormTaskTuple) throws BadArgumentException {
        OAIPMHHarvestingDetails sourceDetails = stormTaskTuple.getSourceDetails();
        String url = stormTaskTuple.getFileUrl();
        ListIdentifiersParameters parameters = this.configureParameters(schema, dataset, fromDate, untilDate);
        return this.parseHeaders(this.sourceProvider.provide(url).listIdentifiers(parameters), sourceDetails.getExcludedSets(), stormTaskTuple, schema);
    }

    private ListIdentifiersParameters configureParameters(String schema, String dataset, Date fromDate, Date untilDate) {
        ListIdentifiersParameters parameters = ListIdentifiersParameters.request().withMetadataPrefix(schema);
        if (fromDate != null) {
            parameters.withFrom(fromDate);
        }
        if (untilDate != null) {
            parameters.withUntil(untilDate);
        }
        if (dataset != null) {
            parameters.withSetSpec(dataset);
        }
        return parameters;
    }

    private void emitIdentifier(StormTaskTuple stormTaskTuple, String identifier, String schema) {
        StormTaskTuple tuple = new Cloner().deepClone(stormTaskTuple);
        tuple.addParameter("CLOUD_LOCAL_IDENTIFIER", identifier);
        tuple.addParameter("SCHEMA_NAME", schema);
        tuple.addParameter("DPS_TASK_INPUT_DATA", stormTaskTuple.getFileUrl());
        tuple.setFileUrl(identifier);
        this.collector.emit((List)tuple.toStormTuple());
    }

    private int parseHeaders(Iterator<Header> headerIterator, Set<String> excludedSets, StormTaskTuple stormTaskTuple, String schema) {
        if (headerIterator == null) {
            throw new IllegalArgumentException("Header iterator is null");
        }
        int count2 = 0;
        while (this.hasNext(headerIterator) && !taskStatusChecker.hasKillFlag(stormTaskTuple.getTaskId())) {
            Header header = headerIterator.next();
            if (!this.filterHeader(header, excludedSets)) continue;
            this.emitIdentifier(stormTaskTuple, header.getIdentifier(), schema);
            ++count2;
        }
        return count2;
    }

    private boolean hasNext(Iterator<Header> headerIterator) {
        int retries = 10;
        while (true) {
            try {
                return headerIterator.hasNext();
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting the next batch: {}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while getting the next batch");
                throw e;
            }
            break;
        }
    }

    protected void waitForSpecificTime() {
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOGGER.error(e1.getMessage());
        }
    }

    private boolean filterHeader(Header header, Set<String> excludedSets) {
        if (excludedSets != null && !excludedSets.isEmpty()) {
            for (String set : excludedSets) {
                if (!header.getSetSpecs().contains(set)) continue;
                return false;
            }
        }
        return true;
    }

    private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
        this.collector.emit("NotificationStream", (List)nt.toStormTuple());
    }

    private class CollectorWrapper
    extends SpoutOutputCollector {
        CollectorWrapper(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            try {
                DpsTask dpsTask = new ObjectMapper().readValue((String)tuple.get(0), DpsTask.class);
                if (dpsTask != null) {
                    long taskId = dpsTask.getTaskId();
                    OAISpout.this.cache.putIfAbsent(taskId, new TaskSpoutInfo(dpsTask));
                }
            }
            catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
            return Collections.emptyList();
        }
    }
}

