/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka.job;

import com.google.common.base.Throwables;
import eu.europeana.cloud.common.model.CloudIdAndTimestampResponse;
import eu.europeana.cloud.common.model.dps.States;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.common.response.CloudTagsResponse;
import eu.europeana.cloud.common.response.ResultSlice;
import eu.europeana.cloud.mcs.driver.DataSetServiceClient;
import eu.europeana.cloud.mcs.driver.FileServiceClient;
import eu.europeana.cloud.mcs.driver.RecordServiceClient;
import eu.europeana.cloud.mcs.driver.RepresentationIterator;
import eu.europeana.cloud.mcs.driver.exception.DriverException;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.OAIPMHHarvestingDetails;
import eu.europeana.cloud.service.dps.storm.NotificationTuple;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFiller;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFillerForLatestRevisionJob;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.QueueFillerForSpecificRevisionJob;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import eu.europeana.cloud.service.mcs.exception.MCSException;
import java.net.MalformedURLException;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.storm.spout.SpoutOutputCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TaskExecutor
implements Callable<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class);
    private static final int INTERNAL_THREADS_NUMBER = 10;
    private static final int DEFAULT_RETRIES = 3;
    private static final int SLEEP_TIME = 5000;
    private static final int MAX_BATCH_SIZE = 100;
    private TaskStatusChecker taskStatusChecker;
    private SpoutOutputCollector collector;
    private CassandraTaskInfoDAO cassandraTaskInfoDAO;
    private ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls;
    private String mcsClientURL;
    private String stream;
    private DpsTask dpsTask;

    public TaskExecutor(SpoutOutputCollector collector, TaskStatusChecker taskStatusChecker, CassandraTaskInfoDAO cassandraTaskInfoDAO, ArrayBlockingQueue<StormTaskTuple> tuplesWithFileUrls, String mcsClientURL, String stream, DpsTask dpsTask) {
        this.collector = collector;
        this.taskStatusChecker = taskStatusChecker;
        this.cassandraTaskInfoDAO = cassandraTaskInfoDAO;
        this.tuplesWithFileUrls = tuplesWithFileUrls;
        this.mcsClientURL = mcsClientURL;
        this.stream = stream;
        this.dpsTask = dpsTask;
    }

    @Override
    public Void call() {
        try {
            this.execute();
        }
        catch (Exception e) {
            this.cassandraTaskInfoDAO.dropTask(this.dpsTask.getTaskId(), "The task was dropped because of " + e.getMessage() + ". The full exception is" + Throwables.getStackTraceAsString(e), TaskState.DROPPED.toString());
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void execute() throws Exception {
        List<String> dataSets = this.dpsTask.getDataEntry(InputDataType.valueOf(this.stream));
        String representationName = this.dpsTask.getParameter("REPRESENTATION_NAME");
        this.dpsTask.getParameters().remove("REPRESENTATION_NAME");
        String revisionName = this.dpsTask.getParameter("REVISION_NAME");
        this.dpsTask.getParameters().remove("REVISION_NAME");
        String revisionProvider = this.dpsTask.getParameter("REVISION_PROVIDER");
        this.dpsTask.getParameters().remove("REVISION_PROVIDER");
        String authorizationHeader = this.dpsTask.getParameter("AUTHORIZATION_HEADER");
        DataSetServiceClient dataSetServiceClient = new DataSetServiceClient(this.mcsClientURL);
        dataSetServiceClient.useAuthorizationHeader(authorizationHeader);
        RecordServiceClient recordServiceClient = new RecordServiceClient(this.mcsClientURL);
        recordServiceClient.useAuthorizationHeader(authorizationHeader);
        FileServiceClient fileClient = new FileServiceClient(this.mcsClientURL);
        fileClient.useAuthorizationHeader(authorizationHeader);
        StormTaskTuple stormTaskTuple = new StormTaskTuple(this.dpsTask.getTaskId(), this.dpsTask.getTaskName(), null, null, this.dpsTask.getParameters(), this.dpsTask.getOutputRevision(), new OAIPMHHarvestingDetails());
        int expectedSize = 0;
        try {
            for (String dataSetUrl : dataSets) {
                try {
                    UrlParser urlParser = new UrlParser(dataSetUrl);
                    if (urlParser.isUrlToDataset()) {
                        if (revisionName != null && revisionProvider != null) {
                            String revisionTimestamp = this.dpsTask.getParameter("REVISION_TIMESTAMP");
                            if (revisionTimestamp != null) {
                                if (stormTaskTuple.getParameter("SAMPLE_SIZE") == null) {
                                    expectedSize += this.handleExactRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, revisionTimestamp, urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                                    continue;
                                }
                                expectedSize += this.handlePartialSizeExactRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, revisionTimestamp, urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS), Integer.parseInt(stormTaskTuple.getParameter("SAMPLE_SIZE")));
                                continue;
                            }
                            if (stormTaskTuple.getParameter("SAMPLE_SIZE") == null) {
                                expectedSize += this.handleLatestRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS));
                                continue;
                            }
                            expectedSize += this.handlePartialSizeForLatestRevisions(stormTaskTuple, dataSetServiceClient, recordServiceClient, fileClient, representationName, revisionName, revisionProvider, urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS), Integer.parseInt(stormTaskTuple.getParameter("SAMPLE_SIZE")));
                            continue;
                        }
                        QueueFiller queueFiller = new QueueFiller(this.taskStatusChecker, this.collector, this.tuplesWithFileUrls);
                        RepresentationIterator iterator = dataSetServiceClient.getRepresentationIterator(urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS));
                        while (iterator.hasNext() && !this.taskStatusChecker.hasKillFlag(this.dpsTask.getTaskId())) {
                            expectedSize += queueFiller.addTupleToQueue(stormTaskTuple, fileClient, iterator.next());
                        }
                        continue;
                    }
                    LOGGER.warn("dataSet url is not formulated correctly {}", (Object)dataSetUrl);
                    this.emitErrorNotification(this.dpsTask.getTaskId(), dataSetUrl, "dataSet url is not formulated correctly", "");
                }
                catch (MalformedURLException ex) {
                    LOGGER.error("MCSReaderSpout error, Error while parsing DataSet URL : {}", (Object)ex.getMessage());
                    this.emitErrorNotification(this.dpsTask.getTaskId(), dataSetUrl, ex.getMessage(), this.dpsTask.getParameters().toString());
                }
            }
            this.cassandraTaskInfoDAO.setUpdateExpectedSize(this.dpsTask.getTaskId(), expectedSize);
        }
        finally {
            fileClient.close();
            dataSetServiceClient.close();
            recordServiceClient.close();
            if (expectedSize == 0) {
                this.cassandraTaskInfoDAO.dropTask(this.dpsTask.getTaskId(), "The task was dropped because it is empty", TaskState.DROPPED.toString());
            }
        }
    }

    private ResultSlice<CloudIdAndTimestampResponse> getLatestDataSetCloudIdByRepresentationAndRevisionChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider, String startFrom) throws MCSException, DriverException {
        int retries = 3;
        while (true) {
            try {
                ResultSlice<CloudIdAndTimestampResponse> resultSlice = dataSetServiceClient.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(datasetName, datasetProvider, revisionProvider, revisionName, representationName, false, startFrom);
                if (resultSlice == null || resultSlice.getResults() == null) {
                    throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                }
                return resultSlice;
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting slice of  latest cloud Id from data set.Retries Left{} ", (Object)retries);
                    this.waitForSpecificTime(5000);
                    continue;
                }
                LOGGER.error("Error while getting slice of latest cloud Id from data set.");
                throw e;
            }
            break;
        }
    }

    private int handleLatestRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
        ResultSlice<CloudIdAndTimestampResponse> resultSlice;
        int count = 0;
        String startFrom = null;
        long taskId = stormTaskTuple.getTaskId();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
        do {
            resultSlice = this.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, datasetName, datasetProvider, startFrom);
            List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList = resultSlice.getResults();
            Future<Integer> job = executorService.submit(new QueueFillerForLatestRevisionJob(fileServiceClient, recordServiceClient, this.collector, this.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, cloudIdAndTimestampResponseList));
            futures.add(job);
            if (futures.size() != 10) continue;
            count += this.getCountAndWait(futures);
        } while ((startFrom = resultSlice.getNextSlice()) != null && !this.taskStatusChecker.hasKillFlag(taskId));
        if (futures.size() > 0) {
            count += this.getCountAndWait(futures);
        }
        executorService.shutdown();
        return count;
    }

    private int handlePartialSizeForLatestRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileServiceClient, String representationName, String revisionName, String revisionProvider, String datasetName, String datasetProvider, int maxRecordsCount) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
        ResultSlice<CloudIdAndTimestampResponse> resultSlice;
        int count = 0;
        String startFrom = null;
        long taskId = stormTaskTuple.getTaskId();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
        int total = 0;
        do {
            List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList;
            if ((total += (cloudIdAndTimestampResponseList = (resultSlice = this.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, datasetName, datasetProvider, startFrom)).getResults()).size()) > maxRecordsCount) {
                if (total - maxRecordsCount >= 100) break;
                cloudIdAndTimestampResponseList = cloudIdAndTimestampResponseList.subList(0, maxRecordsCount % 100);
            }
            Future<Integer> job = executorService.submit(new QueueFillerForLatestRevisionJob(fileServiceClient, recordServiceClient, this.collector, this.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, cloudIdAndTimestampResponseList));
            futures.add(job);
            if (futures.size() != 10) continue;
            count += this.getCountAndWait(futures);
        } while ((startFrom = resultSlice.getNextSlice()) != null && !this.taskStatusChecker.hasKillFlag(taskId));
        if (futures.size() > 0) {
            count += this.getCountAndWait(futures);
        }
        executorService.shutdown();
        return count;
    }

    private int handleExactRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
        ResultSlice<CloudTagsResponse> resultSlice;
        int count = 0;
        String startFrom = null;
        long taskId = stormTaskTuple.getTaskId();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
        do {
            resultSlice = this.getDataSetRevisionsChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, datasetProvider, datasetName, startFrom);
            List<CloudTagsResponse> cloudTagsResponses = resultSlice.getResults();
            Future<Integer> job = executorService.submit(new QueueFillerForSpecificRevisionJob(fileClient, recordServiceClient, this.collector, this.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, revisionTimestamp, cloudTagsResponses));
            futures.add(job);
            if (futures.size() != 10) continue;
            count += this.getCountAndWait(futures);
        } while ((startFrom = resultSlice.getNextSlice()) != null && !this.taskStatusChecker.hasKillFlag(taskId));
        if (futures.size() > 0) {
            count += this.getCountAndWait(futures);
        }
        executorService.shutdown();
        return count;
    }

    private int handlePartialSizeExactRevisions(StormTaskTuple stormTaskTuple, DataSetServiceClient dataSetServiceClient, RecordServiceClient recordServiceClient, FileServiceClient fileClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName, int maxRecordsCount) throws MCSException, DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
        ResultSlice<CloudTagsResponse> resultSlice;
        int count = 0;
        String startFrom = null;
        long taskId = stormTaskTuple.getTaskId();
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
        int total = 0;
        do {
            List<CloudTagsResponse> cloudTagsResponses;
            if ((total += (cloudTagsResponses = (resultSlice = this.getDataSetRevisionsChunk(dataSetServiceClient, representationName, revisionName, revisionProvider, revisionTimestamp, datasetProvider, datasetName, startFrom)).getResults()).size()) > maxRecordsCount) {
                if (total - maxRecordsCount >= 100) break;
                cloudTagsResponses = cloudTagsResponses.subList(0, maxRecordsCount % 100);
            }
            Future<Integer> job = executorService.submit(new QueueFillerForSpecificRevisionJob(fileClient, recordServiceClient, this.collector, this.taskStatusChecker, this.tuplesWithFileUrls, stormTaskTuple, representationName, revisionName, revisionProvider, revisionTimestamp, cloudTagsResponses));
            futures.add(job);
            if (futures.size() != 10) continue;
            count += this.getCountAndWait(futures);
        } while ((startFrom = resultSlice.getNextSlice()) != null && !this.taskStatusChecker.hasKillFlag(taskId));
        if (futures.size() > 0) {
            count += this.getCountAndWait(futures);
        }
        executorService.shutdown();
        return count;
    }

    private int getCountAndWait(Set<Future<Integer>> futures) throws InterruptedException, ExecutionException {
        int count = 0;
        for (Future<Integer> future : futures) {
            count += future.get().intValue();
        }
        futures.clear();
        return count;
    }

    private ResultSlice<CloudTagsResponse> getDataSetRevisionsChunk(DataSetServiceClient dataSetServiceClient, String representationName, String revisionName, String revisionProvider, String revisionTimestamp, String datasetProvider, String datasetName, String startFrom) throws MCSException, DriverException {
        int retries = 3;
        while (true) {
            try {
                ResultSlice<CloudTagsResponse> resultSlice = dataSetServiceClient.getDataSetRevisionsChunk(datasetProvider, datasetName, representationName, revisionName, revisionProvider, revisionTimestamp, startFrom, null);
                if (resultSlice == null || resultSlice.getResults() == null) {
                    throw new DriverException("Getting cloud ids and revision tags: result chunk obtained but is empty.");
                }
                return resultSlice;
            }
            catch (Exception e) {
                if (retries-- > 0) {
                    LOGGER.warn("Error while getting Revisions from data set.Retries Left{} ", (Object)retries);
                    this.waitForSpecificTime(5000);
                    continue;
                }
                LOGGER.error("Error while getting Revisions from data set.");
                throw e;
            }
            break;
        }
    }

    private void waitForSpecificTime(int milliSecond) {
        try {
            Thread.sleep(milliSecond);
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
            LOGGER.error(e1.getMessage());
        }
    }

    private void emitErrorNotification(long taskId, String resource, String message, String additionalInformations) {
        NotificationTuple nt = NotificationTuple.prepareNotification(taskId, resource, States.ERROR, message, additionalInformations);
        this.collector.emit("NotificationStream", (List)nt.toStormTuple());
    }
}

