/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm.io;

import com.google.gson.Gson;
import com.rits.cloning.Cloner;
import eu.europeana.cloud.common.utils.Clock;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.io.ReadFileBolt;
import eu.europeana.cloud.service.dps.storm.utils.StormTaskTupleHelper;
import eu.europeana.metis.mediaprocessing.RdfConverterFactory;
import eu.europeana.metis.mediaprocessing.RdfDeserializer;
import eu.europeana.metis.mediaprocessing.exception.RdfDeserializationException;
import eu.europeana.metis.mediaprocessing.model.RdfResourceEntry;
import java.io.InputStream;
import java.time.Instant;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class ParseFileBolt
extends ReadFileBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(ParseFileBolt.class);
    private transient Gson gson;
    protected transient RdfDeserializer rdfDeserializer;

    public ParseFileBolt(String ecloudMcsAddress) {
        super(ecloudMcsAddress);
    }

    protected abstract List<RdfResourceEntry> getResourcesFromRDF(byte[] var1) throws RdfDeserializationException;

    protected StormTaskTuple createStormTuple(StormTaskTuple stormTaskTuple, RdfResourceEntry rdfResourceEntry, int linksCount) {
        StormTaskTuple tuple = new Cloner().deepClone(stormTaskTuple);
        LOGGER.debug("Sending this resource link {} to be processed ", (Object)rdfResourceEntry.getResourceUrl());
        tuple.addParameter("RESOURCE_LINK", this.gson.toJson((Object)rdfResourceEntry));
        tuple.addParameter("RESOURCE_LINKS_COUNT", String.valueOf(linksCount));
        tuple.addParameter("RESOURCE_URL", rdfResourceEntry.getResourceUrl());
        return tuple;
    }

    protected abstract int getLinksCount(StormTaskTuple var1, int var2) throws RdfDeserializationException;

    @Override
    public void execute(Tuple anchorTuple, StormTaskTuple stormTaskTuple) {
        Instant processingStartTime;
        block11: {
            LOGGER.debug("Starting file parsing");
            processingStartTime = Instant.now();
            try (InputStream stream = this.getFileStreamByStormTuple(stormTaskTuple);){
                byte[] fileContent = IOUtils.toByteArray(stream);
                List<RdfResourceEntry> rdfResourceEntries = this.getResourcesFromRDF(fileContent);
                int linksCount = this.getLinksCount(stormTaskTuple, rdfResourceEntries.size());
                if (linksCount == 0) {
                    StormTaskTuple tuple = new Cloner().deepClone(stormTaskTuple);
                    LOGGER.debug("The EDM file has no resource Links ");
                    this.outputCollector.emit(anchorTuple, (List)tuple.toStormTuple());
                    break block11;
                }
                LOGGER.debug("Found {} resources for {} : {}", rdfResourceEntries.size(), stormTaskTuple.getParameters().get("CLOUD_LOCAL_IDENTIFIER"), rdfResourceEntries);
                for (RdfResourceEntry rdfResourceEntry : rdfResourceEntries) {
                    if (this.taskStatusChecker.hasDroppedStatus(stormTaskTuple.getTaskId())) {
                        break;
                    }
                    StormTaskTuple tuple = this.createStormTuple(stormTaskTuple, rdfResourceEntry, Integer.parseInt(stormTaskTuple.getParameter("RESOURCE_LINKS_COUNT")));
                    this.outputCollector.emit(anchorTuple, (List)tuple.toStormTuple());
                }
            }
            catch (Exception e) {
                LOGGER.error("Unable to read and parse file ", e);
                this.emitErrorNotification(anchorTuple, stormTaskTuple.getTaskId(), stormTaskTuple.isMarkedAsDeleted(), stormTaskTuple.getFileUrl(), e.getMessage(), "Error while reading and parsing the EDM file. The full error is: " + ExceptionUtils.getStackTrace(e), StormTaskTupleHelper.getRecordProcessingStartTime(stormTaskTuple));
            }
        }
        this.outputCollector.ack(anchorTuple);
        LOGGER.info("File parsing finished in: {}ms", (Object)Clock.millisecondsSince(processingStartTime));
    }

    @Override
    public void prepare() {
        super.prepare();
        try {
            this.rdfDeserializer = new RdfConverterFactory().createRdfDeserializer();
            this.gson = new Gson();
        }
        catch (Exception e) {
            LOGGER.error("Unable to initialize RDF Deserializer ", e);
            throw new RuntimeException(e);
        }
    }
}

