/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.spouts.kafka;

import eu.europeana.cloud.common.model.CloudIdAndTimestampResponse;
import eu.europeana.cloud.common.model.File;
import eu.europeana.cloud.common.model.Representation;
import eu.europeana.cloud.common.model.dps.TaskState;
import eu.europeana.cloud.common.response.CloudTagsResponse;
import eu.europeana.cloud.common.response.ResultSlice;
import eu.europeana.cloud.mcs.driver.RepresentationIterator;
import eu.europeana.cloud.mcs.driver.exception.DriverException;
import eu.europeana.cloud.service.commons.urls.UrlParser;
import eu.europeana.cloud.service.commons.urls.UrlPart;
import eu.europeana.cloud.service.dps.DpsRecord;
import eu.europeana.cloud.service.dps.DpsTask;
import eu.europeana.cloud.service.dps.InputDataType;
import eu.europeana.cloud.service.dps.RecordExecutionSubmitService;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.MCSReader;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.SubmitTaskParameters;
import eu.europeana.cloud.service.dps.storm.spouts.kafka.SubmitingTaskWasKilled;
import eu.europeana.cloud.service.dps.storm.utils.DateHelper;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusChecker;
import eu.europeana.cloud.service.dps.storm.utils.TaskStatusUpdater;
import java.net.MalformedURLException;
import java.time.Instant;
import java.util.ConcurrentModificationException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MCSTaskSubmiter {
    private static final int INTERNAL_THREADS_NUMBER = 10;
    private static final int MAX_BATCH_SIZE = 100;
    private static final Logger LOGGER = LoggerFactory.getLogger(MCSTaskSubmiter.class);
    private final TaskStatusChecker taskStatusChecker;
    private final TaskStatusUpdater taskStatusUpdater;
    private final RecordExecutionSubmitService recordSubmitService;
    private final String mcsClientURL;

    public MCSTaskSubmiter(TaskStatusChecker taskStatusChecker, TaskStatusUpdater taskStatusUpdater, RecordExecutionSubmitService recordSubmitService, String mcsClientURL) {
        this.taskStatusChecker = taskStatusChecker;
        this.taskStatusUpdater = taskStatusUpdater;
        this.recordSubmitService = recordSubmitService;
        this.mcsClientURL = mcsClientURL;
    }

    public void execute(SubmitTaskParameters submitParameters) {
        DpsTask task = submitParameters.getTask();
        try {
            LOGGER.info("Sending task id={} to topology {} by kafka topic {}. Parameters:\n{}", task.getTaskId(), submitParameters.getTopologyName(), submitParameters.getTopicName(), submitParameters);
            this.checkIfTaskIsKilled(task);
            this.logProgress(submitParameters, 0);
            int expectedSize = this.taskContainsFileUrls(task) ? this.executeForFilesList(submitParameters) : this.executeForDatasetList(submitParameters);
            if (expectedSize != 0) {
                this.taskStatusUpdater.updateStatusExpectedSize(task.getTaskId(), TaskState.QUEUED.toString(), expectedSize);
                LOGGER.info("Submitting {} records of task id={} to Kafka succeeded.", (Object)expectedSize, (Object)task.getTaskId());
            } else {
                this.taskStatusUpdater.setTaskDropped(task.getTaskId(), "The task was dropped because it is empty");
                LOGGER.warn("The task id={} was dropped because it is empty.", (Object)task.getTaskId());
            }
        }
        catch (SubmitingTaskWasKilled e) {
            LOGGER.warn(e.getMessage(), e);
        }
        catch (Exception e) {
            LOGGER.error("MCSTaskSubmiter error for taskId={}", (Object)task.getTaskId(), (Object)e);
            this.taskStatusUpdater.setTaskDropped(task.getTaskId(), "The task was dropped because " + e.getMessage());
        }
    }

    private MCSReader createMcsReader(SubmitTaskParameters submitParameters) {
        String authorizationHeader = submitParameters.getTask().getParameter("AUTHORIZATION_HEADER");
        return new MCSReader(this.mcsClientURL, authorizationHeader);
    }

    private int executeForFilesList(SubmitTaskParameters submitParameters) {
        List<String> filesList = submitParameters.getTask().getDataEntry(InputDataType.FILE_URLS);
        for (String file : filesList) {
            this.submitRecord(file, submitParameters);
        }
        return filesList.size();
    }

    private int executeForDatasetList(SubmitTaskParameters submitParameters) throws Exception {
        int expectedSize = 0;
        for (String dataSetUrl : submitParameters.getTask().getDataEntry(InputDataType.DATASET_URLS)) {
            expectedSize += this.executeForOneDataSet(dataSetUrl, submitParameters);
        }
        return expectedSize;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private int executeForOneDataSet(String dataSetUrl, SubmitTaskParameters submitParameters) throws InterruptedException, ExecutionException {
        try (MCSReader reader = this.createMcsReader(submitParameters);){
            UrlParser urlParser = new UrlParser(dataSetUrl);
            if (!urlParser.isUrlToDataset()) {
                throw new RuntimeException("DataSet URL is not formulated correctly: " + dataSetUrl);
            }
            int expectedSize = 0;
            expectedSize = this.getRevisionName(submitParameters.getTask()) != null && this.getRevisionProvider(submitParameters.getTask()) != null ? (expectedSize += this.executeForRevision(urlParser.getPart(UrlPart.DATA_SETS), urlParser.getPart(UrlPart.DATA_PROVIDERS), submitParameters, reader)) : (expectedSize += this.executeForEntireDataset(urlParser, submitParameters, reader));
            int n = expectedSize;
            return n;
        }
        catch (MalformedURLException e) {
            throw new RuntimeException("MCSTaskSubmiter error, Error while parsing DataSet URL : \"" + dataSetUrl + "\"", e);
        }
    }

    private int executeForEntireDataset(UrlParser urlParser, SubmitTaskParameters submitParameters, MCSReader reader) {
        int expectedSize = 0;
        RepresentationIterator iterator = reader.getRepresentationsOfEntireDataset(urlParser);
        while (iterator.hasNext()) {
            this.checkIfTaskIsKilled(submitParameters.getTask());
            expectedSize += this.submitRecordsForAllFilesOfRepresentation(iterator.next(), submitParameters);
        }
        return expectedSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private int executeForRevision(String datasetName, String datasetProvider, SubmitTaskParameters submitParameters, MCSReader reader) throws DriverException, InterruptedException, ConcurrentModificationException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        try {
            ResultSlice<CloudIdAndTimestampResponse> slice;
            DpsTask task = submitParameters.getTask();
            int maxRecordsCount = this.getMaxRecordsCount(task);
            int count = 0;
            String startFrom = null;
            HashSet<Future<Integer>> futures = new HashSet<Future<Integer>>(10);
            int total = 0;
            do {
                this.checkIfTaskIsKilled(task);
                slice = this.getCloudIdsChunk(datasetName, datasetProvider, startFrom, task, reader);
                List<CloudIdAndTimestampResponse> cloudIdAndTimestampResponseList = slice.getResults();
                int maxRecordsLeft = maxRecordsCount - total;
                if (cloudIdAndTimestampResponseList.size() > maxRecordsLeft) {
                    cloudIdAndTimestampResponseList = cloudIdAndTimestampResponseList.subList(0, maxRecordsLeft);
                }
                total += cloudIdAndTimestampResponseList.size();
                List<CloudIdAndTimestampResponse> finalCloudIdAndTimestampResponseList = cloudIdAndTimestampResponseList;
                futures.add(executor.submit(() -> this.executeGettingFileUrlsForCloudIdList(finalCloudIdAndTimestampResponseList, submitParameters, reader)));
                if (futures.size() < 1000) continue;
                count += this.getCountAndWait(futures);
            } while ((startFrom = slice.getNextSlice()) != null && total < maxRecordsCount);
            if (futures.size() > 0) {
                count += this.getCountAndWait(futures);
            }
            int n = count;
            return n;
        }
        finally {
            executor.shutdown();
        }
    }

    private ResultSlice<CloudIdAndTimestampResponse> getCloudIdsChunk(String datasetName, String datasetProvider, String startFrom, DpsTask task, MCSReader reader) {
        if (this.getRevisionTimestamp(task) != null) {
            ResultSlice<CloudTagsResponse> chunk = reader.getDataSetRevisionsChunk(this.getRepresentationName(task), this.getRevisionName(task), this.getRevisionProvider(task), this.getRevisionTimestamp(task), datasetProvider, datasetName, startFrom);
            return this.toCloudAndTimestampResponse(chunk, this.getRevisionTimestamp(task));
        }
        return reader.getLatestDataSetCloudIdByRepresentationAndRevisionChunk(this.getRepresentationName(task), this.getRevisionName(task), this.getRevisionProvider(task), datasetName, datasetProvider, startFrom);
    }

    private Integer executeGettingFileUrlsForCloudIdList(List<CloudIdAndTimestampResponse> responseList, SubmitTaskParameters submitParameters, MCSReader reader) {
        int count = 0;
        for (CloudIdAndTimestampResponse response : responseList) {
            count += this.executeGettingFileUrlsForOneCloudId(response, submitParameters, reader);
        }
        return count;
    }

    private int executeGettingFileUrlsForOneCloudId(CloudIdAndTimestampResponse response, SubmitTaskParameters submitParameters, MCSReader reader) {
        DpsTask task = submitParameters.getTask();
        String revisionTimestamp = DateHelper.getUTCDateString(response.getRevisionTimestamp());
        int count = 0;
        List<Representation> representations = reader.getRepresentationsByRevision(this.getRepresentationName(task), this.getRevisionName(task), this.getRevisionProvider(task), revisionTimestamp, response.getCloudId());
        for (Representation representation : representations) {
            count += this.submitRecordsForAllFilesOfRepresentation(representation, submitParameters);
        }
        return count;
    }

    private int submitRecordsForAllFilesOfRepresentation(Representation representation, SubmitTaskParameters submitParameters) {
        int count = 0;
        if (representation == null) {
            throw new RuntimeException("Problem while reading representation - representation is null.");
        }
        for (File file : representation.getFiles()) {
            this.checkIfTaskIsKilled(submitParameters.getTask());
            String fileUrl = file.getContentUri().toString();
            this.submitRecord(fileUrl, submitParameters);
            ++count;
        }
        return count;
    }

    private ResultSlice<CloudIdAndTimestampResponse> toCloudAndTimestampResponse(ResultSlice<CloudTagsResponse> chunk, String revisionTimestamp) {
        return new ResultSlice<CloudIdAndTimestampResponse>(chunk.getNextSlice(), chunk.getResults().stream().map(response -> new CloudIdAndTimestampResponse(response.getCloudId(), Date.from(Instant.parse(revisionTimestamp)))).collect(Collectors.toList()));
    }

    private void submitRecord(String fileUrl, SubmitTaskParameters submitParameters) {
        DpsTask task = submitParameters.getTask();
        DpsRecord record = DpsRecord.builder().taskId(task.getTaskId()).metadataPrefix(this.getSchemaName(task)).recordId(fileUrl).build();
        this.recordSubmitService.submitRecord(record, submitParameters.getTopicName());
        this.logProgress(submitParameters, submitParameters.incrementAndGetSentRecordCounter());
    }

    private void logProgress(SubmitTaskParameters submitParameters, int submitedCount) {
        if (submitedCount % 1000 == 0) {
            LOGGER.info("Task id={} records submiting is progressing. Already submited: {} records", (Object)submitParameters.getTask().getTaskId(), (Object)submitedCount);
        }
    }

    private boolean taskContainsFileUrls(DpsTask task) {
        return task.getInputData().get((Object)InputDataType.FILE_URLS) != null;
    }

    private int getCountAndWait(Set<Future<Integer>> futures) throws InterruptedException, ExecutionException {
        int count = 0;
        for (Future<Integer> future : futures) {
            count += future.get().intValue();
        }
        futures.clear();
        return count;
    }

    private void checkIfTaskIsKilled(DpsTask task) {
        if (this.taskStatusChecker.hasKillFlag(task.getTaskId())) {
            throw new SubmitingTaskWasKilled(task);
        }
    }

    private Integer getMaxRecordsCount(DpsTask task) {
        return Optional.ofNullable(task.getParameter("SAMPLE_SIZE")).map(Integer::parseInt).orElse(Integer.MAX_VALUE);
    }

    private String getRevisionTimestamp(DpsTask task) {
        return task.getParameter("REVISION_TIMESTAMP");
    }

    private String getRevisionProvider(DpsTask task) {
        return task.getParameter("REVISION_PROVIDER");
    }

    private String getRevisionName(DpsTask task) {
        return task.getParameter("REVISION_NAME");
    }

    private String getRepresentationName(DpsTask task) {
        return task.getParameter("REPRESENTATION_NAME");
    }

    private String getSchemaName(DpsTask task) {
        return task.getParameter("SCHEMA_NAME");
    }
}

