package eu.europeana.oaipmh.service;

import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoClient;
import com.mongodb.event.ConnectionAddedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
import com.mongodb.event.ConnectionCheckedOutEvent;
import com.mongodb.event.ConnectionPoolClosedEvent;
import com.mongodb.event.ConnectionPoolListener;
import com.mongodb.event.ConnectionPoolOpenedEvent;
import com.mongodb.event.ConnectionRemovedEvent;
import eu.europeana.corelib.definitions.edm.beans.FullBean;
import eu.europeana.corelib.edm.utils.EdmUtils;
import eu.europeana.corelib.record.api.WebMetaInfo;
import eu.europeana.corelib.solr.bean.impl.FullBeanImpl;
import eu.europeana.metis.mongo.connection.MongoClientProvider;
import eu.europeana.metis.mongo.dao.RecordDao;
import eu.europeana.metis.schema.jibx.DatasetName;
import eu.europeana.metis.schema.jibx.EuropeanaAggregationType;
import eu.europeana.metis.schema.jibx.RDF;
import eu.europeana.metis.utils.ExternalRequestUtil;
import eu.europeana.oaipmh.model.Header;
import eu.europeana.oaipmh.model.ListRecords;
import eu.europeana.oaipmh.model.RDFMetadata;
import eu.europeana.oaipmh.model.Record;
import eu.europeana.oaipmh.profile.TrackTime;
import eu.europeana.oaipmh.service.exception.IdDoesNotExistException;
import eu.europeana.oaipmh.service.exception.InternalServerErrorException;
import eu.europeana.oaipmh.service.exception.OaiPmhException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.zookeeper.server.quorum.QuorumStats;
import org.springframework.beans.factory.annotation.Value;

/* loaded from: input_file:WEB-INF/classes/eu/europeana/oaipmh/service/DBRecordProvider.class */
public class DBRecordProvider extends BaseProvider implements RecordProvider, ConnectionPoolListener {
    private static final Logger LOG = LogManager.getLogger((Class<?>) DBRecordProvider.class);
    private static final String RECORD_WITH_ID = "Record with id %s ";
    private static final int THREADS_THRESHOLD = 10;
    private static final int MAX_THREADS_THRESHOLD = 20;

    @Value("${mongodb.connectionUrl}")
    private String connectionUrl;

    @Value("${mongodb.record.dbname}")
    private String recordDBName;

    @Value("${enhanceWithTechnicalMetadata:true}")
    private boolean enhanceWithTechnicalMetadata;

    @Value("${expandWithFullText:false}")
    private boolean expandWithFullText;

    @Value("${threadsCount:1}")
    private int threadsCount;

    @Value("${maxThreadsCount:20}")
    private int maxThreadsCount;
    private int nrConnections = 2;
    private ExecutorService threadPool;
    private MongoClient mongoClient;
    private RecordDao recordDao;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/eu/europeana/oaipmh/service/DBRecordProvider$CollectRecordsResult.class */
    public static class CollectRecordsResult {
        private int threadId;
        private List<Record> records;

        CollectRecordsResult(int i, List<Record> list) {
            this.threadId = i;
            this.records = list;
            DBRecordProvider.LOG.trace("Thread {} returning {} records", Integer.valueOf(i), Integer.valueOf(list.size()));
        }

        int getThreadId() {
            return this.threadId;
        }

        List<Record> getRecords() {
            return this.records;
        }
    }

    /* loaded from: input_file:WEB-INF/classes/eu/europeana/oaipmh/service/DBRecordProvider$CollectRecordsTask.class */
    private class CollectRecordsTask implements Callable<CollectRecordsResult> {
        private int threadId;
        private List<Header> identifiers;

        CollectRecordsTask(List<Header> list, int i) {
            this.identifiers = list;
            this.threadId = i;
            DBRecordProvider.LOG.trace("Create thread {}", Integer.valueOf(i));
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public CollectRecordsResult call() throws Exception {
            ArrayList arrayList = new ArrayList();
            for (Header header : this.identifiers) {
                String prepareRecordId = DBRecordProvider.this.prepareRecordId(header.getIdentifier());
                arrayList.add(new Record(header, DBRecordProvider.this.prepareRDFMetadata(prepareRecordId, (FullBeanImpl) DBRecordProvider.this.getFullBean(prepareRecordId))));
            }
            return new CollectRecordsResult(this.threadId, arrayList);
        }
    }

    @PostConstruct
    private void init() {
        initMongo();
        initThreadPool();
    }

    private void initMongo() {
        this.mongoClient = MongoClientProvider.create(this.connectionUrl).createMongoClient();
        this.recordDao = new RecordDao(this.mongoClient, this.recordDBName, false);
        LOG.info("Connected to mongo database {} at {}", this.recordDBName, new MongoClientURI(this.connectionUrl).getHosts());
    }

    private void initThreadPool() {
        if (this.maxThreadsCount < 10) {
            this.maxThreadsCount = 20;
        }
        if (this.threadsCount < 1) {
            this.threadsCount = 1;
        } else if (this.threadsCount > 10 && this.threadsCount <= this.maxThreadsCount) {
            LOG.warn("Number of threads exceeds 10 which may narrow the number of clients working in parallel");
        } else if (this.threadsCount > this.maxThreadsCount) {
            LOG.warn("Number of threads exceeds {}, which may highly narrow the number of clients working in parallel. Changing to {}", (Object) Integer.valueOf(this.maxThreadsCount), (Object) 20);
            this.threadsCount = 20;
        }
        LOG.info("Creating new thread pool with {} threads.", Integer.valueOf(this.threadsCount));
        this.threadPool = Executors.newFixedThreadPool(this.threadsCount);
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public void connectionPoolOpened(ConnectionPoolOpenedEvent connectionPoolOpenedEvent) {
        LOG.debug("Connection pool opened {}", connectionPoolOpenedEvent);
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public void connectionPoolClosed(ConnectionPoolClosedEvent connectionPoolClosedEvent) {
        LOG.debug("Connection pool closed {}", connectionPoolClosedEvent);
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public void connectionCheckedOut(ConnectionCheckedOutEvent connectionCheckedOutEvent) {
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public void connectionCheckedIn(ConnectionCheckedInEvent connectionCheckedInEvent) {
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public synchronized void connectionAdded(ConnectionAddedEvent connectionAddedEvent) {
        this.nrConnections++;
        LOG.debug("{} for dbProvider {}, total Mongo connections = {}", connectionAddedEvent, Integer.valueOf(hashCode()), Integer.valueOf(this.nrConnections));
    }

    @Override // com.mongodb.event.ConnectionPoolListener
    public synchronized void connectionRemoved(ConnectionRemovedEvent connectionRemovedEvent) {
        this.nrConnections--;
        LOG.debug("{} for dbProvider {}, total Mongo connections = {}", connectionRemovedEvent, Integer.valueOf(hashCode()), Integer.valueOf(this.nrConnections));
    }

    @Override // eu.europeana.oaipmh.service.RecordProvider
    @TrackTime
    public Record getRecord(String str) throws OaiPmhException {
        String prepareRecordId = prepareRecordId(str);
        FullBean fullBean = getFullBean(prepareRecordId);
        return new Record(getHeader(str, fullBean), prepareRDFMetadata(prepareRecordId, (FullBeanImpl) fullBean));
    }

    @Override // eu.europeana.oaipmh.service.RecordProvider
    public void checkRecordExists(String str) throws OaiPmhException {
        if (getFullBean(prepareRecordId(str)) == null) {
            throw new IdDoesNotExistException("Record with identifier " + str + " not found!");
        }
    }

    @TrackTime
    private FullBean getFullBean(String str) throws InternalServerErrorException {
        try {
            return (FullBean) ExternalRequestUtil.retryableExternalRequest(() -> {
                try {
                    return this.recordDao.getFullBean(str);
                } catch (Exception e) {
                    throw new RuntimeException("Error retrieving fullbean for record " + str, e);
                }
            });
        } catch (Exception e) {
            LOG.error(String.format(RECORD_WITH_ID, str) + " could not be retrieved.", (Throwable) e);
            throw new InternalServerErrorException(String.format(RECORD_WITH_ID, str) + " could not be retrieved due to database problems.");
        }
    }

    private RDFMetadata prepareRDFMetadata(String str, FullBeanImpl fullBeanImpl) throws OaiPmhException {
        if (fullBeanImpl == null) {
            throw new IdDoesNotExistException(str);
        }
        enhanceWithTechnicalMetadata(fullBeanImpl);
        RDF rdf = getRDF(fullBeanImpl);
        if (rdf == null) {
            throw new InternalServerErrorException(String.format(RECORD_WITH_ID, str) + " could not be converted to EDM.");
        }
        updateDatasetName(rdf);
        return new RDFMetadata(removeXMLHeader(getEDM(rdf)));
    }

    @TrackTime
    public String getEDM(RDF rdf) {
        try {
            return EdmUtils.toEDM(rdf);
        } catch (RuntimeException e) {
            String str = QuorumStats.Provider.UNKNOWN_STATE;
            if (!rdf.getEuropeanaAggregationList().isEmpty()) {
                str = rdf.getEuropeanaAggregationList().get(0).getAbout();
            }
            LOG.error("Error converting RDF to EDM for record {}", str);
            throw e;
        }
    }

    @TrackTime
    public RDF getRDF(FullBeanImpl fullBeanImpl) {
        return EdmUtils.toRDF(fullBeanImpl);
    }

    @Override // eu.europeana.oaipmh.service.RecordProvider
    public ListRecords listRecords(List<Header> list) throws OaiPmhException {
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(list.size());
        ArrayList arrayList2 = new ArrayList();
        double size = list.size() / this.threadsCount;
        LOG.debug("{} identifiers and {} threads, so {} records per thread", Integer.valueOf(list.size()), Integer.valueOf(this.threadsCount), Double.valueOf(size));
        for (int i = 0; i < this.threadsCount; i++) {
            int i2 = (int) (i * size);
            int i3 = (int) ((i + 1) * size);
            List<Header> subList = list.subList(i2, i3);
            LOG.debug("Creating task {} to retrieve records {} to {}", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
            arrayList2.add(new CollectRecordsTask(subList, i));
        }
        try {
            Iterator it = this.threadPool.invokeAll(arrayList2).iterator();
            while (it.hasNext()) {
                CollectRecordsResult collectRecordsResult = (CollectRecordsResult) ((Future) it.next()).get();
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Thread {} collected {} records.", Integer.valueOf(collectRecordsResult.getThreadId()), Integer.valueOf(collectRecordsResult.getRecords().size()));
                }
                arrayList.addAll((int) (collectRecordsResult.getThreadId() * size), collectRecordsResult.getRecords());
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.error("Thread interrupted.", (Throwable) e);
        } catch (ExecutionException e2) {
            LOG.error("Error retrieving data", (Throwable) e2);
            throw new InternalServerErrorException("Error retrieving data");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("ListRecords using {} threads finished in {} ms.", Integer.valueOf(this.threadsCount), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
        ListRecords listRecords = new ListRecords();
        listRecords.setRecords(arrayList);
        return listRecords;
    }

    @TrackTime
    private void updateDatasetName(RDF rdf) {
        EuropeanaAggregationType europeanaAggregationType = rdf.getEuropeanaAggregationList().get(0);
        if (europeanaAggregationType.getCollectionName() != null) {
            DatasetName datasetName = new DatasetName();
            datasetName.setString(europeanaAggregationType.getCollectionName().getString());
            europeanaAggregationType.setDatasetName(datasetName);
            europeanaAggregationType.setCollectionName(null);
        }
    }

    @TrackTime
    private void enhanceWithTechnicalMetadata(FullBean fullBean) {
        long currentTimeMillis = System.currentTimeMillis();
        if (!this.enhanceWithTechnicalMetadata || fullBean == null) {
            return;
        }
        WebMetaInfo.injectWebMetaInfoBatch(fullBean, this.recordDao, null);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Technical metadata injected in {} ms.", String.valueOf(System.currentTimeMillis() - currentTimeMillis));
        }
    }

    @TrackTime
    private String removeXMLHeader(String str) {
        int indexOf = str.indexOf("?>");
        return indexOf != -1 ? str.substring(indexOf + "?>".length()) : str;
    }

    private Header getHeader(String str, FullBean fullBean) throws IdDoesNotExistException {
        if (fullBean == null) {
            throw new IdDoesNotExistException(str);
        }
        Header header = new Header();
        header.setIdentifier(str);
        header.setDatestamp(fullBean.getTimestampCreated());
        ArrayList arrayList = new ArrayList();
        for (String str2 : fullBean.getEuropeanaCollectionName()) {
            arrayList.add(getSetIdentifier(str2));
        }
        header.setSetSpec(arrayList);
        return header;
    }

    @Override // eu.europeana.oaipmh.service.ClosableProvider
    @PreDestroy
    public void close() {
        LOG.info("Shutting down Mongo connections...");
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
        if (this.threadPool != null) {
            this.threadPool.shutdown();
        }
    }
}
