/*
 * Decompiled with CFR 0.152.
 */
package eu.europeana.cloud.http.bolts;

import eu.europeana.cloud.harvesting.commons.IdentifierSupplier;
import eu.europeana.cloud.service.commons.utils.RetryInterruptedException;
import eu.europeana.cloud.service.commons.utils.RetryableMethodExecutor;
import eu.europeana.cloud.service.dps.storm.AbstractDpsBolt;
import eu.europeana.cloud.service.dps.storm.StormTaskTuple;
import eu.europeana.metis.utils.TempFileUtils;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FilenameUtils;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpHarvestingBolt
extends AbstractDpsBolt {
    private static final Logger LOGGER = LoggerFactory.getLogger(HttpHarvestingBolt.class);
    private static final int SLEEP_TIME_BETWEEN_RETRIES_MS = 30000;
    private transient IdentifierSupplier identifierSupplier;
    private transient HttpClient httpClient;

    public void prepare() {
        this.identifierSupplier = new IdentifierSupplier();
        this.httpClient = HttpClient.newBuilder().build();
    }

    public void execute(Tuple anchorTuple, StormTaskTuple tuple) {
        try {
            LOGGER.info("Starting http harvesting for url: {}", (Object)tuple.getFileUrl());
            this.harvestRecord(tuple);
            this.outputCollector.emit(anchorTuple, (List)tuple.toStormTuple());
            this.outputCollector.ack(anchorTuple);
        }
        catch (RetryInterruptedException e) {
            this.handleInterruption(e, anchorTuple);
        }
        catch (InterruptedException e) {
            LOGGER.error("Bolt thread is being interrupted!", (Throwable)e);
            Thread.currentThread().interrupt();
            this.outputCollector.fail(anchorTuple);
        }
        catch (Exception e) {
            LOGGER.error(e.getMessage(), (Throwable)e);
            this.emitErrorNotification(anchorTuple, tuple, "Error while reading a file", "Can't read file: " + tuple.getFileUrl() + " because of " + e.getMessage());
            this.outputCollector.ack(anchorTuple);
        }
    }

    private void harvestRecord(StormTaskTuple tuple) throws Exception {
        HttpResponse<byte[]> response = this.tryLoadHttpFileCoupleOfTimes(tuple);
        byte[] fileContent = response.body();
        tuple.setFileData(fileContent);
        tuple.addParameter("OUTPUT_MIME_TYPE", this.probeMimeType(tuple.getFileUrl(), fileContent));
        this.identifierSupplier.prepareIdentifiers(tuple);
    }

    private String probeMimeType(String fileUrl, byte[] fileContent) throws IOException {
        String extension = FilenameUtils.getExtension((String)fileUrl);
        Path tempFile = TempFileUtils.createSecureTempFile((String)"http_harvest", (String)("." + extension));
        Files.write(tempFile, fileContent, new OpenOption[0]);
        String mimeType = Files.probeContentType(tempFile);
        Files.delete(tempFile);
        return mimeType;
    }

    private HttpResponse<byte[]> tryLoadHttpFileCoupleOfTimes(StormTaskTuple tuple) throws Exception {
        return (HttpResponse)RetryableMethodExecutor.execute((String)"Loading file by http failed!", (int)6, (int)30000, () -> this.loadHttpFile(tuple));
    }

    private HttpResponse<byte[]> loadHttpFile(StormTaskTuple tuple) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder(URI.create(tuple.getFileUrl())).GET().build();
        HttpResponse<byte[]> response = this.httpClient.send(request, HttpResponse.BodyHandlers.ofByteArray());
        if (response.statusCode() != 200) {
            throw new IOException("Bad return status code: " + response.statusCode());
        }
        return response;
    }
}

