/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.service.dps.storm;

import eu.europeana.cloud.common.properties.CassandraProperties;
import eu.europeana.cloud.service.commons.md5.FileMd5GenerationService;
import eu.europeana.cloud.service.commons.utils.DateHelper;
import eu.europeana.cloud.service.dps.storm.AbstractDpsBolt;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.cloud.service.dps.storm.incremental.CategorizationParameters;
import eu.europeana.cloud.service.dps.storm.incremental.CategorizationResult;
import eu.europeana.cloud.service.dps.storm.service.HarvestedRecordCategorizationService;
import java.util.List;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HarvestedRecordCategorizationBolt
extends AbstractDpsBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(HarvestedRecordCategorizationBolt.class);
    protected transient HarvestedRecordCategorizationService harvestedRecordCategorizationService;

    public HarvestedRecordCategorizationBolt(CassandraProperties cassandraProperties) {
        super(cassandraProperties);
    }

    @Override
    public void execute(Tuple anchorTuple, StormTaskTuple t) {
        CategorizationParameters categorizationParameters = this.prepareCategorizationParameters(t);
        LOGGER.info("Starting categorization for {}", (Object)categorizationParameters);
        CategorizationResult categorizationResult = this.categorizeRecord(categorizationParameters);
        if (categorizationResult.shouldBeProcessed()) {
            LOGGER.info("Further processing will take place for {} and {}", (Object)categorizationResult.getCategorizationParameters(), (Object)categorizationResult.getHarvestedRecord());
            this.pushRecordToNextBolt(anchorTuple, t);
        } else {
            LOGGER.info("Further processing will be stopped for {} and {}", (Object)categorizationResult.getCategorizationParameters(), (Object)categorizationResult.getHarvestedRecord());
            this.ignoreRecordAsNotChanged(anchorTuple, t, categorizationResult);
        }
        this.outputCollector.ack(anchorTuple);
    }

    private CategorizationParameters prepareCategorizationParameters(StormTaskTuple tuple) {
        return CategorizationParameters.builder().fullHarvest(!this.isIncrementalHarvesting(tuple)).datasetId(tuple.getParameter("METIS_DATASET_ID")).recordId(tuple.getParameter("CLOUD_LOCAL_IDENTIFIER")).recordMd5(FileMd5GenerationService.generateUUID((byte[])tuple.getFileData())).currentHarvestDate(DateHelper.parse((String)tuple.getParameter("HARVEST_DATE"))).recordDateStamp(tuple.getParameter("RECORD_DATESTAMP") != null ? DateHelper.parse((String)tuple.getParameter("RECORD_DATESTAMP")) : null).build();
    }

    private boolean isIncrementalHarvesting(StormTaskTuple tuple) {
        return "true".equals(tuple.getParameter("INCREMENTAL_HARVEST"));
    }

    private CategorizationResult categorizeRecord(CategorizationParameters categorizationParameters) {
        return this.harvestedRecordCategorizationService.categorize(categorizationParameters);
    }

    private void pushRecordToNextBolt(Tuple anchorTuple, StormTaskTuple t) {
        this.outputCollector.emit(anchorTuple, (List)t.toStormTuple());
    }

    private void ignoreRecordAsNotChanged(Tuple anchorTuple, StormTaskTuple stormTaskTuple, CategorizationResult categorizationResult) {
        this.emitIgnoredNotification(anchorTuple, stormTaskTuple, "Record ignored.", "Record ignored in this incremental processing because it was already processed. Record datestamp: " + String.valueOf(categorizationResult.getCategorizationParameters().getRecordDateStamp()) + ".");
    }
}

