package org.apache.pulsar.io.elasticsearch.client.elastic;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperationVariant;
import co.elastic.clients.elasticsearch.core.bulk.DeleteOperation;
import co.elastic.clients.elasticsearch.core.bulk.IndexOperation;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.elasticsearch.ElasticSearchConfig;
import org.apache.pulsar.io.elasticsearch.RandomExponentialRetry;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.opensearch.index.reindex.BulkByScrollTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor.class */
public class ElasticBulkProcessor implements BulkProcessor {
    private static final Logger log = LoggerFactory.getLogger(ElasticBulkProcessor.class);
    private final ElasticSearchConfig config;
    private final ElasticsearchClient client;
    private final int bulkActions;
    private final long bulkSize;
    private final BulkRequestHandler bulkRequestHandler;
    private final ExecutorService internalExecutorService;
    private ScheduledFuture<?> futureFlushTask;
    private final AtomicLong executionIdGen = new AtomicLong();
    private final List<BulkOperationWithPulsarRecord> pendingOperations = new ArrayList();
    private volatile boolean closed = false;
    private final ObjectMapper mapper = new ObjectMapper();
    private final ReentrantLock lock = new ReentrantLock();

    /* loaded from: input_file:org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor$BulkOperationWithPulsarRecord.class */
    public static class BulkOperationWithPulsarRecord extends BulkOperation {
        private static final int REQUEST_OVERHEAD = 50;
        private final Record pulsarRecord;
        private final long estimatedSizeInBytes;

        public static BulkOperationWithPulsarRecord indexOperation(IndexOperation indexOperation, Record record, long j) {
            return new BulkOperationWithPulsarRecord(indexOperation, record, 50 + j);
        }

        public static BulkOperationWithPulsarRecord deleteOperation(DeleteOperation deleteOperation, Record record) {
            return new BulkOperationWithPulsarRecord(deleteOperation, record, 50L);
        }

        public BulkOperationWithPulsarRecord(BulkOperationVariant bulkOperationVariant, Record record, long j) {
            super(bulkOperationVariant);
            this.pulsarRecord = record;
            this.estimatedSizeInBytes = j;
        }

        public Record getPulsarRecord() {
            return this.pulsarRecord;
        }

        public long getEstimatedSizeInBytes() {
            return this.estimatedSizeInBytes;
        }
    }

    /* loaded from: input_file:org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor$BulkRequestHandler.class */
    public final class BulkRequestHandler {
        private final BulkProcessor.Listener listener;
        private final Semaphore semaphore;
        private final RandomExponentialRetry retry;
        private final int concurrentRequests;
        static final /* synthetic */ boolean $assertionsDisabled;

        BulkRequestHandler(RandomExponentialRetry randomExponentialRetry, int i, BulkProcessor.Listener listener) {
            if (!$assertionsDisabled && i < 0) {
                throw new AssertionError();
            }
            this.concurrentRequests = i;
            this.retry = randomExponentialRetry;
            this.semaphore = new Semaphore(i > 0 ? i : 1);
            this.listener = listener;
        }

        public void execute(BulkRequest bulkRequest, long j) {
            try {
                this.semaphore.acquire();
                CompletableFuture completableFuture = new CompletableFuture();
                ElasticBulkProcessor.this.internalExecutorService.submit(() -> {
                    Callable callable = () -> {
                        return ElasticBulkProcessor.this.client.bulk(bulkRequest);
                    };
                    try {
                        if (ElasticBulkProcessor.log.isDebugEnabled()) {
                            ElasticBulkProcessor.log.debug("Sending bulk {}", Long.valueOf(j));
                        }
                        BulkResponse bulkResponse = (BulkResponse) this.retry.retry(callable, ElasticBulkProcessor.this.config.getMaxRetries(), ElasticBulkProcessor.this.config.getRetryBackoffInMs(), BulkByScrollTask.Status.RETRIES_BULK_FIELD);
                        if (ElasticBulkProcessor.log.isDebugEnabled()) {
                            ElasticBulkProcessor.log.debug("Sending bulk {} completed", Long.valueOf(j));
                        }
                        completableFuture.complete(bulkResponse);
                    } catch (Throwable th) {
                        ElasticBulkProcessor.log.warn("Failed to execute bulk request {}", Long.valueOf(j), th);
                        completableFuture.completeExceptionally(th);
                    }
                });
                CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture.thenApply(bulkResponse -> {
                    this.semaphore.release();
                    this.listener.afterBulk(j, convertBulkRequest(bulkRequest), convertBulkResponse(bulkResponse));
                    completableFuture2.complete(null);
                    return null;
                }).exceptionally(th -> {
                    this.semaphore.release();
                    this.listener.afterBulk(j, convertBulkRequest(bulkRequest), th);
                    ElasticBulkProcessor.log.warn("Failed to execute bulk request " + j, th);
                    completableFuture2.complete(null);
                    return null;
                });
                if (ElasticBulkProcessor.this.config.getBulkConcurrentRequests() == 0) {
                    completableFuture2.join();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.listener.afterBulk(j, convertBulkRequest(bulkRequest), e);
            }
        }

        boolean awaitClose(long j, TimeUnit timeUnit) throws InterruptedException {
            if (!this.semaphore.tryAcquire(this.concurrentRequests, j, timeUnit)) {
                return false;
            }
            this.semaphore.release(this.concurrentRequests);
            return true;
        }

        private List<BulkProcessor.BulkOperationRequest> convertBulkRequest(BulkRequest bulkRequest) {
            return (List) bulkRequest.operations().stream().map(bulkOperation -> {
                return BulkProcessor.BulkOperationRequest.builder().pulsarRecord(((BulkOperationWithPulsarRecord) bulkOperation).getPulsarRecord()).build();
            }).collect(Collectors.toList());
        }

        private List<BulkProcessor.BulkOperationResult> convertBulkResponse(BulkResponse bulkResponse) {
            return (List) bulkResponse.items().stream().map(bulkResponseItem -> {
                return BulkProcessor.BulkOperationResult.builder().error(bulkResponseItem.error() != null ? bulkResponseItem.error().type() : null).index(bulkResponseItem.index()).documentId(bulkResponseItem.id()).build();
            }).collect(Collectors.toList());
        }

        static {
            $assertionsDisabled = !ElasticBulkProcessor.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/pulsar/io/elasticsearch/client/elastic/ElasticBulkProcessor$Flush.class */
    class Flush implements Runnable {
        Flush() {
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ElasticBulkProcessor.this.closed) {
                return;
            }
            ElasticBulkProcessor.this.flush();
        }
    }

    public ElasticBulkProcessor(ElasticSearchConfig elasticSearchConfig, ElasticsearchClient elasticsearchClient, BulkProcessor.Listener listener) {
        this.config = elasticSearchConfig;
        this.client = elasticsearchClient;
        this.bulkActions = elasticSearchConfig.getBulkActions();
        this.bulkSize = elasticSearchConfig.getBulkSizeInMb() * 1024 * 1024;
        this.internalExecutorService = Executors.newFixedThreadPool(Math.max(1, elasticSearchConfig.getBulkConcurrentRequests()), new ThreadFactoryBuilder().setNameFormat("elastic-bulk-executor-%d").build());
        this.bulkRequestHandler = new BulkRequestHandler(new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec()), elasticSearchConfig.getBulkConcurrentRequests(), listener);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("elastic-flush-task-%d").build());
        if (elasticSearchConfig.getBulkFlushIntervalInMs() > 0) {
            this.futureFlushTask = newSingleThreadScheduledExecutor.scheduleWithFixedDelay(new Flush(), elasticSearchConfig.getBulkFlushIntervalInMs(), elasticSearchConfig.getBulkFlushIntervalInMs(), TimeUnit.MILLISECONDS);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor
    public void appendIndexRequest(BulkProcessor.BulkIndexRequest bulkIndexRequest) throws IOException {
        IndexOperation build2 = ((IndexOperation.Builder) ((IndexOperation.Builder) new IndexOperation.Builder().index(bulkIndexRequest.getIndex())).id(bulkIndexRequest.getDocumentId())).document((Map) this.mapper.readValue(bulkIndexRequest.getDocumentSource(), Map.class)).build2();
        long j = 0;
        if (this.config.getBulkSizeInMb() > 0) {
            j = bulkIndexRequest.getDocumentSource().getBytes(StandardCharsets.UTF_8).length;
        }
        add(BulkOperationWithPulsarRecord.indexOperation(build2, bulkIndexRequest.getRecord(), j));
    }

    @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor
    public void appendDeleteRequest(BulkProcessor.BulkDeleteRequest bulkDeleteRequest) {
        add(BulkOperationWithPulsarRecord.deleteOperation(new DeleteOperation.Builder().index(bulkDeleteRequest.getIndex()).id(bulkDeleteRequest.getDocumentId()).build2(), bulkDeleteRequest.getRecord()));
    }

    protected void ensureOpen() {
        if (this.closed) {
            throw new IllegalStateException("bulk process already closed");
        }
    }

    private BulkRequest createBulkRequestAndResetPendingOps() {
        BulkRequest build2 = new BulkRequest.Builder().operations(new ArrayList(this.pendingOperations)).build2();
        this.pendingOperations.clear();
        return build2;
    }

    private void execute(boolean z) {
        this.lock.lock();
        try {
            ensureOpen();
            if (this.pendingOperations.isEmpty()) {
                return;
            }
            if (!z && !isOverTheLimit()) {
                this.lock.unlock();
                return;
            }
            BulkRequest createBulkRequestAndResetPendingOps = createBulkRequestAndResetPendingOps();
            long incrementAndGet = this.executionIdGen.incrementAndGet();
            this.lock.unlock();
            execute(createBulkRequestAndResetPendingOps, incrementAndGet);
        } finally {
            this.lock.unlock();
        }
    }

    private boolean isOverTheLimit() {
        if (this.pendingOperations.isEmpty()) {
            return false;
        }
        if (this.bulkActions <= 0 || this.pendingOperations.size() < this.bulkActions) {
            return this.bulkSize > 0 && this.pendingOperations.stream().mapToLong(bulkOperationWithPulsarRecord -> {
                return bulkOperationWithPulsarRecord.getEstimatedSizeInBytes();
            }).sum() >= this.bulkSize;
        }
        return true;
    }

    @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor
    public void flush() {
        execute(true);
    }

    private void execute(BulkRequest bulkRequest, long j) {
        this.bulkRequestHandler.execute(bulkRequest, j);
    }

    private void executeIfNeeded() {
        execute(false);
    }

    public void add(BulkOperationWithPulsarRecord bulkOperationWithPulsarRecord) {
        this.lock.lock();
        try {
            ensureOpen();
            this.pendingOperations.add(bulkOperationWithPulsarRecord);
            executeIfNeeded();
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.apache.pulsar.io.elasticsearch.client.BulkProcessor, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.lock.lock();
            try {
                if (this.closed) {
                    return;
                }
                if (this.futureFlushTask != null) {
                    this.futureFlushTask.cancel(false);
                }
                flush();
                this.bulkRequestHandler.awaitClose(5000L, TimeUnit.MILLISECONDS);
                this.closed = true;
                this.lock.unlock();
            } finally {
                this.lock.unlock();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
