package io.ebeanservice.elastic;

import com.fasterxml.jackson.core.JsonFactory;
import io.ebean.DocStoreQueueEntry;
import io.ebean.config.JsonConfig;
import io.ebean.plugin.BeanType;
import io.ebean.plugin.SpiServer;
import io.ebeanservice.docstore.api.DocStoreQueryUpdate;
import io.ebeanservice.docstore.api.DocStoreTransaction;
import io.ebeanservice.docstore.api.DocStoreUpdate;
import io.ebeanservice.docstore.api.DocStoreUpdateProcessor;
import io.ebeanservice.docstore.api.DocStoreUpdates;
import io.ebeanservice.elastic.bulk.BulkSender;
import io.ebeanservice.elastic.bulk.BulkTransaction;
import io.ebeanservice.elastic.bulk.BulkUpdate;
import io.ebeanservice.elastic.support.IndexMessageSender;
import io.ebeanservice.elastic.support.IndexQueueWriter;
import io.ebeanservice.elastic.update.ConvertToGroups;
import io.ebeanservice.elastic.update.ProcessGroup;
import io.ebeanservice.elastic.update.UpdateGroup;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import javax.persistence.PersistenceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/ebeanservice/elastic/ElasticUpdateProcessor.class */
public class ElasticUpdateProcessor implements DocStoreUpdateProcessor {
    private final Logger logger = LoggerFactory.getLogger(ElasticUpdateProcessor.class);
    private final SpiServer server;
    private final IndexQueueWriter queueWriter;
    private final int defaultBatchSize;
    private final BulkSender bulkSender;

    public ElasticUpdateProcessor(SpiServer spiServer, IndexQueueWriter indexQueueWriter, JsonFactory jsonFactory, Object obj, IndexMessageSender indexMessageSender, int i) {
        this.server = spiServer;
        this.queueWriter = indexQueueWriter;
        this.defaultBatchSize = i;
        this.bulkSender = new BulkSender(jsonFactory, JsonConfig.Include.NON_EMPTY, obj, indexMessageSender);
    }

    public DocStoreTransaction createTransaction(int i) {
        try {
            return new BulkTransaction(createBulkUpdate(i));
        } catch (IOException e) {
            throw new PersistenceException("Error creating bulk transaction", e);
        }
    }

    public void commit(DocStoreTransaction docStoreTransaction) {
        docStoreTransaction.flush();
        queue(docStoreTransaction.queue());
    }

    private void queue(DocStoreUpdates docStoreUpdates) {
        if (docStoreUpdates != null) {
            this.server.backgroundExecutor().execute(() -> {
                try {
                    this.logger.debug("queue wait for changes...");
                    Thread.sleep(1000L);
                    process(docStoreUpdates, 0);
                } catch (Exception e) {
                    this.logger.error("Error processing queued changes ", e);
                }
            });
        }
    }

    public void onStartup() {
        this.queueWriter.onStartup();
    }

    public <T> DocStoreQueryUpdate<T> createQueryUpdate(BeanType<T> beanType, int i) throws IOException {
        return new ElasticQueryUpdate(createBulkUpdate(i), beanType);
    }

    public BulkUpdate createBulkUpdate(int i) throws IOException {
        return new BulkUpdate(i > 0 ? i : this.defaultBatchSize, this.bulkSender);
    }

    public void process(DocStoreUpdates docStoreUpdates, int i) {
        try {
            BulkUpdate createBulkUpdate = createBulkUpdate(i);
            Iterator it = docStoreUpdates.getPersistEvents().iterator();
            while (it.hasNext()) {
                ((DocStoreUpdate) it.next()).docStoreUpdate(createBulkUpdate.obtain());
            }
            Iterator it2 = docStoreUpdates.getDeleteEvents().iterator();
            while (it2.hasNext()) {
                ((DocStoreUpdate) it2.next()).docStoreUpdate(createBulkUpdate.obtain());
            }
            processQueue(createBulkUpdate, docStoreUpdates.getNestedEvents());
            createBulkUpdate.flush();
            sendQueueEvents(docStoreUpdates);
        } catch (IOException e) {
            this.logger.error("Failed to send bulk updates", e);
        }
    }

    public long processQueue(BulkUpdate bulkUpdate, List<DocStoreQueueEntry> list) throws IOException {
        long j = 0;
        for (UpdateGroup updateGroup : ConvertToGroups.groupByQueueId(list)) {
            j += ProcessGroup.process(this.server, this.server.beanTypeForQueueId(updateGroup.getQueueId()), updateGroup, bulkUpdate);
        }
        return j;
    }

    private void sendQueueEvents(DocStoreUpdates docStoreUpdates) {
        this.queueWriter.queue(docStoreUpdates.getQueueEntries());
    }
}
