/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka;

import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.model.CloudIdAndTimestampResponse;
import eu.europeana.cloud.common.model.File;
import eu.europeana.cloud.common.model.Representation;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.common.response.CloudTagsResponse;
import eu.europeana.cloud.common.response.RepresentationRevisionResponse;
import eu.europeana.cloud.common.response.ResultSlice;
import eu.europeana.cloud.mcs.driver.DataSetServiceClient;
import eu.europeana.cloud.mcs.driver.FileServiceClient;
import eu.europeana.cloud.mcs.driver.RecordServiceClient;
import eu.europeana.cloud.mcs.driver.RepresentationIterator;
import eu.europeana.cloud.mcs.driver.exception.DriverException;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.CustomKafkaSpout;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.utils.TaskSpoutInfo;
import eu.europeana.cloud.service.dps.storm.utils.DateHelper;
import eu.europeana.cloud.service.mcs.exception.MCSException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.spout.ISpoutOutputCollector;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MCSReaderSpout
extends CustomKafkaSpout {
    private SpoutOutputCollector collector;
    private static final Logger LOGGER = LoggerFactory.getLogger(MCSReaderSpout.class);
    private transient ConcurrentHashMap<Long, TaskSpoutInfo> cache;
    private static final int DEFAULT_RETRIES = 10;
    private static final int SLEEP_TIME = 5000;
    private static final String NOTIFICATION_STREAM_NAME = "NotificationStream";
    private String mcsClientURL;

    MCSReaderSpout(SpoutConfig spoutConf) {
        super(spoutConf);
    }

    public MCSReaderSpout(SpoutConfig spoutConf, String hosts, int port, String keyspaceName, String userName, String password, String mcsClientURL) {
        super(spoutConf, hosts, port, keyspaceName, userName, password);
        this.mcsClientURL = mcsClientURL;
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        this.cache = new ConcurrentHashMap(50);
        super.open(conf, context, new CollectorWrapper((ISpoutOutputCollector)collector));
    }

    @Override
    public void nextTuple() {
        block6: {
            DpsTask dpsTask = null;
            try {
                super.nextTuple();
                Iterator iterator2 = ((ConcurrentHashMap.KeySetView)this.cache.keySet()).iterator();
                while (iterator2.hasNext()) {
                    long taskId = (Long)iterator2.next();
                    TaskSpoutInfo currentTask = this.cache.get(taskId);
                    if (currentTask.isStarted()) continue;
                    LOGGER.info("Start progressing for Task with id {}", (Object)currentTask.getDpsTask().getTaskId());
                    this.startProgress(currentTask);
                    dpsTask = currentTask.getDpsTask();
                    String stream = this.getStream(dpsTask);
                    if (stream.equals(InputDataType.FILE_URLS.name())) {
                        StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), null, null, dpsTask.getParameters(), dpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
                        List<String> files = dpsTask.getDataEntry(InputDataType.valueOf(stream));
                        for (String file : files) {
                            StormTaskTuple fileTuple = new Cloner().deepClone(stormTaskTuple);
                            fileTuple.addParameter("DPS_TASK_INPUT_DATA", file);
                            this.collector.emit((List)fileTuple.toStormTuple());
                        }
                    } else {
                        this.execute(stream, dpsTask);
                    }
                    this.cache.remove(taskId);
                }
            }
            catch (Exception e) {
                LOGGER.error("StaticDpsTaskSpout error: {}", (Object)e.getMessage());
                if (dpsTask == null) break block6;
                this.cassandraTaskInfoDAO.dropTask(dpsTask.getTaskId(), "The task was dropped because " + e.getMessage(), TaskState.DROPPED.toString());
            }
        }
    }

    private String getStream(DpsTask task) {
        if (task.getInputData().get((Object)InputDataType.FILE_URLS) != null) {
            return InputDataType.FILE_URLS.name();
        }
        return InputDataType.DATASET_URLS.name();
    }

    private void startProgress(TaskSpoutInfo taskInfo) {
        taskInfo.startTheTask();
        DpsTask task = taskInfo.getDpsTask();
        this.cassandraTaskInfoDAO.updateTask(task.getTaskId(), "", String.valueOf((Object)TaskState.CURRENTLY_PROCESSING), new Date());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(StormTaskTuple.getFields());
        declarer.declareStream(NOTIFICATION_STREAM_NAME, NotificationTuple.getFields());
    }

    public void execute(String stream, DpsTask dpsTask) throws MCSException, DriverException {
        List<String> dataSets = dpsTask.getDataEntry(InputDataType.valueOf(stream));
        String representationName = dpsTask.getParameter("REPRESENTATION_NAME");
        dpsTask.getParameters().remove("REPRESENTATION_NAME");
        String revisionName = dpsTask.getParameter("REVISION_NAME");
        dpsTask.getParameters().remove("REVISION_NAME");
        String revisionProvider = dpsTask.getParameter("REVISION_PROVIDER");
        dpsTask.getParameters().remove("REVISION_PROVIDER");
        String authorizationHeader = dpsTask.getParameter("AUTHORIZATION_HEADER");
        DataSetServiceClient dataSetServiceClient = new DataSetServiceClient(this.mcsClientURL);
        dataSetServiceClient.useAuthorizationHeader(authorizationHeader);
        RecordServiceClient recordServiceClient = new RecordServiceClient(this.mcsClientURL);
        recordServiceClient.useAuthorizationHeader(authorizationHeader);
        FileServiceClient fileClient = new FileServiceClient(this.mcsClientURL);
        fileClient.useAuthorizationHeader(authorizationHeader);
        for (String dataSetUrl : dataSets) {
            try {
                UrlParser urlParser = new UrlParser(dataSetUrl);
                if (urlParser.isUrlToDataset()) {
                    if (revisionName != null && revisionProvider != null) {
                        String revisionTimestamp = dpsTask.getParameter("REVISION_TIMESTAMP");
                        if (revisionTimestamp != null) {
                            this.handleExactRevisions(dpsTask, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, revisionTimestamp, urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                            continue;
                        }
                        this.handleLatestRevisions(dpsTask, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS));
                        continue;
                    }
                    RepresentationIterator iterator2 = dataSetServiceClient.getRepresentationIterator(urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                    while (iterator2.hasNext() && !taskStatusChecker.hasKillFlag(dpsTask.getTaskId())) {
                        Representation representation = iterator2.next();
                        this.emitFilesFromRepresentation(dpsTask, fileClient, representation);
                    }
                    continue;
                }
                LOGGER.warn("dataset url is not formulated correctly {}", (Object)dataSetUrl);
                this.emitErrorNotification(dpsTask.getTaskId(), dataSetUrl, "dataset url is not formulated correctly", "");
            }
            catch (MalformedURLException ex) {
                LOGGER.error("MCSReaderSpout error, Error while parsing DataSet URL : {}", (Object)ex.getMessage());
                this.emitErrorNotification(dpsTask.getTaskId(), dataSetUrl, ex.getMessage(), dpsTask.getParameters().toString());
            }
        }
        this.cassandraTaskInfoDAO.setUpdateExpectedSize(dpsTask.getTaskId(), this.cache.get(dpsTask.getTaskId()).getFileCount());
    }

    private void handleLatestRevisions(DpsTask dpsTask, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider) throws MCSException, DriverException {
        ResultSlice<CloudIdAndTimestampResponse> resultSlice;
        String startFrom = null;
        long taskId = dpsTask.getTaskId();
        block0: do {
            resultSlice = this.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, datasetName, datasetProvider, startFrom);
            List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList = resultSlice.getResults();
            for (CloudIdAndTimestampResponse cloudIdAndTimestampResponse : cloudIdAndTimestampResponseList) {
                if (taskStatusChecker.hasKillFlag(taskId)) continue block0;
                String responseCloudId = cloudIdAndTimestampResponse.getCloudId();
                RepresentationRevisionResponse representationRevisionResponse = this.getRepresentationRevision(recordServiceClient, representationName, revisionName, revisionProvider, DateHelper.getUTCDateString(cloudIdAndTimestampResponse.getRevisionTimestamp()), responseCloudId);
                Representation representation = this.getRepresentation(recordServiceClient, representationName, responseCloudId, representationRevisionResponse);
                this.emitFilesFromRepresentation(dpsTask, fileServiceClient, representation);
            }
        } while ((startFrom = resultSlice.getNextSlice()) != null && !taskStatusChecker.hasKillFlag(taskId));
    }

    private void emitFilesFromRepresentation(DpsTask dpsTask, FileServiceClient fileServiceClient, Representation representation) {
        StormTaskTuple stormTaskTuple = new StormTaskTuple(dpsTask.getTaskId(), dpsTask.getTaskName(), null, null, dpsTask.getParameters(), dpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
        if (representation != null) {
            for (File file : representation.getFiles()) {
                String fileUrl = "";
                if (!taskStatusChecker.hasKillFlag(dpsTask.getTaskId())) {
                    try {
                        fileUrl = fileServiceClient.getFileUri(representation.getCloudId(), representation.getRepresentationName(), representation.getVersion(), file.getFileName()).toString();
                        StormTaskTuple fileTuple = this.buildNextStormTuple(stormTaskTuple, fileUrl);
                        this.cache.get(stormTaskTuple.getTaskId()).inc();
                        this.collector.emit((List)fileTuple.toStormTuple());
                    }
                    catch (Exception e) {
                        LOGGER.warn("Error while getting File URI from MCS {}", (Object)e.getMessage());
                        this.emitErrorNotification(dpsTask.getTaskId(), fileUrl, "Error while getting File URI from MCS " + e.getMessage(), "");
                    }
                    continue;
                }
                break;
            }
        } else {
            LOGGER.warn("Problem while reading representation");
        }
    }

    private ResultSlice<CloudIdAndTimestampResponse> getLatestDataSetCloudIdByRepresentationAndRevisionChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider, String startFrom) throws MCSException, DriverException {
        int retries = 10;
        while (true) {
            try {
                ResultSlice<CloudIdAndTimestampResponse> resultSlice = dataSetServiceClient.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(datasetName, datasetProvider, revisionProvider, revisionName, representationName, false, startFrom);
                if (resultSlice == null || resultSlice.getResults() == null) {
                    throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                }
                return resultSlice;
            }
            catch (DriverException | MCSException e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting slice of  latest cloud Id from data set.Retries Left{} ", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while getting slice of latest cloud Id from data set.");
                throw e;
            }
            break;
        }
    }

    private void handleExactRevisions(DpsTask dpsTask, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName) throws MCSException, DriverException {
        ResultSlice<CloudTagsResponse> resultSlice;
        String startFrom = null;
        long taskId = dpsTask.getTaskId();
        block0: do {
            resultSlice = this.getDataSetRevisionsChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, datasetProvider, datasetName, startFrom);
            List<CloudTagsResponse> cloudTagsResponses = resultSlice.getResults();
            for (CloudTagsResponse cloudTagsResponse : cloudTagsResponses) {
                if (taskStatusChecker.hasKillFlag(taskId)) continue block0;
                String responseCloudId = cloudTagsResponse.getCloudId();
                RepresentationRevisionResponse representationRevisionResponse = this.getRepresentationRevision(recordServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, responseCloudId);
                Representation representation = this.getRepresentation(recordServiceClient, representationName, responseCloudId, representationRevisionResponse);
                this.emitFilesFromRepresentation(dpsTask, fileClient, representation);
            }
        } while ((startFrom = resultSlice.getNextSlice()) != null && !taskStatusChecker.hasKillFlag(taskId));
    }

    private Representation getRepresentation(RecordServiceClient recordServiceClient, String representationName, String responseCloudId, RepresentationRevisionResponse representationRevisionResponse) throws MCSException {
        int retries = 10;
        while (true) {
            try {
                return recordServiceClient.getRepresentation(responseCloudId, representationName, representationRevisionResponse.getVersion());
            }
            catch (DriverException | MCSException e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting Representation. Retries left{}", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while getting Representation.");
                throw e;
            }
            break;
        }
    }

    private RepresentationRevisionResponse getRepresentationRevision(RecordServiceClient recordServiceClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String responseCloudId) throws MCSException {
        int retries = 10;
        while (true) {
            try {
                return recordServiceClient.getRepresentationRevision(responseCloudId, representationName, revisionName, revisionProvider, revisionTimestamp);
            }
            catch (DriverException | MCSException e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting representation revision. Retries Left{} ", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while getting representation revision.");
                throw e;
            }
            break;
        }
    }

    private ResultSlice<CloudTagsResponse> getDataSetRevisionsChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName, String startFrom) throws MCSException, DriverException {
        int retries = 10;
        while (true) {
            try {
                ResultSlice<CloudTagsResponse> resultSlice = dataSetServiceClient.getDataSetRevisionsChunk(datasetProvider, datasetName, representationName, revisionName, revisionProvider, revisionTimestamp, startFrom, null);
                if (resultSlice == null || resultSlice.getResults() == null) {
                    throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                }
                return resultSlice;
            }
            catch (DriverException | MCSException e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting Revisions from data set.Retries Left{} ", (Object)retries);
                    this.waitForSpecificTime();
                    continue;
                }
                LOGGER.error("Error while getting Revisions from data set.");
                throw e;
            }
            break;
        }
    }

    private StormTaskTuple buildNextStormTuple(StormTaskTuple stormTaskTuple, String fileUrl) {
        StormTaskTuple fileTuple = new Cloner().deepClone(stormTaskTuple);
        fileTuple.addParameter("DPS_TASK_INPUT_DATA", fileUrl);
        return fileTuple;
    }

    private void waitForSpecificTime() {
        try {
            Thread.sleep(5000L);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOGGER.error(e1.getMessage());
        }
    }

    private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
        this.collector.emit(NOTIFICATION_STREAM_NAME, (List)nt.toStormTuple());
    }

    private class CollectorWrapper
    extends SpoutOutputCollector {
        CollectorWrapper(ISpoutOutputCollector delegate) {
            super(delegate);
        }

        public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
            try {
                DpsTask dpsTask = new ObjectMapper().readValue((String)tuple.get(0), DpsTask.class);
                if (dpsTask != null) {
                    long taskId = dpsTask.getTaskId();
                    MCSReaderSpout.this.cache.putIfAbsent(taskId, new TaskSpoutInfo(dpsTask));
                }
            }
            catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
            return Collections.emptyList();
        }
    }
}

