package org.apache.streams.elasticsearch;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
import java.io.Serializable;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.streams.config.ComponentConfigurator;
import org.apache.streams.core.StreamsDatum;
import org.apache.streams.core.StreamsPersistWriter;
import org.apache.streams.jackson.StreamsJacksonMapper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streams/elasticsearch/ElasticsearchPersistWriter.class */
public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable {
    private static final int DEFAULT_BATCH_SIZE = 100;
    private static final long WAITING_DOCS_LIMIT = 2500;
    private static final long DEFAULT_MAX_WAIT = 10000;
    protected final List<String> affectedIndexes;
    protected final ElasticsearchClientManager manager;
    protected final ElasticsearchWriterConfiguration config;
    protected BulkRequestBuilder bulkRequest;
    private boolean veryLargeBulk;
    private long flushThresholdsRecords;
    private long flushThresholdBytes;
    private long flushThresholdTime;
    private long lastFlush;
    private Timer timer;
    private final AtomicInteger batchesSent;
    private final AtomicInteger batchesResponded;
    protected final AtomicLong currentBatchItems;
    protected final AtomicLong currentBatchBytes;
    private final AtomicLong totalSent;
    private final AtomicLong totalSeconds;
    private final AtomicLong totalOk;
    private final AtomicLong totalFailed;
    private final AtomicLong totalSizeInBytes;
    public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName();
    private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
    private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##");
    private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###");
    private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5242880L;
    protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance();

    public ElasticsearchPersistWriter() {
        this((ElasticsearchWriterConfiguration) new ComponentConfigurator(ElasticsearchWriterConfiguration.class).detectConfiguration());
    }

    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration elasticsearchWriterConfiguration) {
        this(elasticsearchWriterConfiguration, ElasticsearchClientManager.getInstance(elasticsearchWriterConfiguration));
    }

    public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration elasticsearchWriterConfiguration, ElasticsearchClientManager elasticsearchClientManager) {
        this.affectedIndexes = new ArrayList();
        this.veryLargeBulk = false;
        this.flushThresholdsRecords = 100L;
        this.flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD.longValue();
        this.flushThresholdTime = DEFAULT_MAX_WAIT;
        this.lastFlush = new Date().getTime();
        this.timer = new Timer();
        this.batchesSent = new AtomicInteger(0);
        this.batchesResponded = new AtomicInteger(0);
        this.currentBatchItems = new AtomicLong(0L);
        this.currentBatchBytes = new AtomicLong(0L);
        this.totalSent = new AtomicLong(0L);
        this.totalSeconds = new AtomicLong(0L);
        this.totalOk = new AtomicLong(0L);
        this.totalFailed = new AtomicLong(0L);
        this.totalSizeInBytes = new AtomicLong(0L);
        this.config = elasticsearchWriterConfiguration;
        this.manager = elasticsearchClientManager;
        this.bulkRequest = this.manager.client().prepareBulk();
    }

    public long getBatchesSent() {
        return this.batchesSent.get();
    }

    public long getBatchesResponded() {
        return this.batchesResponded.get();
    }

    public long getFlushThresholdsRecords() {
        return this.flushThresholdsRecords;
    }

    public long getFlushThresholdBytes() {
        return this.flushThresholdBytes;
    }

    public long getFlushThreasholdMaxTime() {
        return this.flushThresholdTime;
    }

    public void setFlushThresholdRecords(long j) {
        this.flushThresholdsRecords = j;
    }

    public void setFlushThresholdBytes(long j) {
        this.flushThresholdBytes = j;
    }

    public void setFlushThreasholdMaxTime(long j) {
        this.flushThresholdTime = j;
    }

    public void setVeryLargeBulk(boolean z) {
        this.veryLargeBulk = z;
    }

    private long getLastFlush() {
        return this.lastFlush;
    }

    public long getTotalOutstanding() {
        return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get());
    }

    public long getTotalSent() {
        return this.totalSent.get();
    }

    public long getTotalOk() {
        return this.totalOk.get();
    }

    public long getTotalFailed() {
        return this.totalFailed.get();
    }

    public long getTotalSizeInBytes() {
        return this.totalSizeInBytes.get();
    }

    public long getTotalSeconds() {
        return this.totalSeconds.get();
    }

    public List<String> getAffectedIndexes() {
        return this.affectedIndexes;
    }

    public boolean isConnected() {
        return this.manager.client() != null;
    }

    public String getId() {
        return STREAMS_ID;
    }

    public void write(StreamsDatum streamsDatum) {
        if (streamsDatum == null || streamsDatum.getDocument() == null) {
            return;
        }
        checkForBackOff();
        LOGGER.debug("Write Document: {}", streamsDatum.getDocument());
        Map metadata = streamsDatum.getMetadata();
        LOGGER.debug("Write Metadata: {}", metadata);
        String index = ElasticsearchMetadataUtil.getIndex((Map<String, Object>) metadata, this.config);
        String type = ElasticsearchMetadataUtil.getType((Map<String, Object>) metadata, this.config);
        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
        String parent = ElasticsearchMetadataUtil.getParent(streamsDatum);
        String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum);
        try {
            StreamsDatum appendMetadata = appendMetadata(streamsDatum);
            add(index, type, id, parent, routing, appendMetadata.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(appendMetadata.getTimestamp().getMillis()), docAsJson(appendMetadata.getDocument()));
        } catch (Throwable th) {
            LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", th.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String docAsJson(Object obj) throws IOException {
        return obj instanceof String ? obj.toString() : OBJECT_MAPPER.writeValueAsString(obj);
    }

    protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException {
        String obj = streamsDatum.getDocument() instanceof String ? streamsDatum.getDocument().toString() : OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument());
        if (streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0) {
            return streamsDatum;
        }
        ObjectNode readTree = OBJECT_MAPPER.readTree(obj);
        readTree.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata())));
        streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(readTree));
        return streamsDatum;
    }

    public void cleanUp() {
        try {
            try {
                LOGGER.debug("cleanUp started");
                flushInternal();
                LOGGER.debug("flushInternal completed");
                waitToCatchUp(0, 300000);
                LOGGER.debug("waitToCatchUp completed");
                if (this.veryLargeBulk) {
                    resetRefreshInterval();
                }
                if (this.config.getRefresh().booleanValue()) {
                    refreshIndexes();
                    LOGGER.debug("refreshIndexes completed");
                }
                LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", new Object[]{Long.valueOf(this.totalOk.get()), Long.valueOf(this.totalFailed.get()), Long.valueOf(getTotalOutstanding())});
                this.timer.cancel();
                LOGGER.debug("cleanUp completed");
            } catch (Throwable th) {
                LOGGER.warn("This is unexpected: {}", th);
                if (this.veryLargeBulk) {
                    resetRefreshInterval();
                }
                if (this.config.getRefresh().booleanValue()) {
                    refreshIndexes();
                    LOGGER.debug("refreshIndexes completed");
                }
                LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", new Object[]{Long.valueOf(this.totalOk.get()), Long.valueOf(this.totalFailed.get()), Long.valueOf(getTotalOutstanding())});
                this.timer.cancel();
                LOGGER.debug("cleanUp completed");
            }
        } catch (Throwable th2) {
            if (this.veryLargeBulk) {
                resetRefreshInterval();
            }
            if (this.config.getRefresh().booleanValue()) {
                refreshIndexes();
                LOGGER.debug("refreshIndexes completed");
            }
            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", new Object[]{Long.valueOf(this.totalOk.get()), Long.valueOf(this.totalFailed.get()), Long.valueOf(getTotalOutstanding())});
            this.timer.cancel();
            LOGGER.debug("cleanUp completed");
            throw th2;
        }
    }

    private void resetRefreshInterval() {
        for (String str : this.affectedIndexes) {
            if (this.veryLargeBulk) {
                LOGGER.debug("Resetting our Refresh Interval: {}", str);
                UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(new String[]{str});
                updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", "5s"));
                this.manager.client().admin().indices().updateSettings(updateSettingsRequest).actionGet();
            }
        }
    }

    private void refreshIndexes() {
        for (String str : this.affectedIndexes) {
            if (this.config.getRefresh().booleanValue()) {
                LOGGER.debug("Refreshing ElasticSearch index: {}", str);
                this.manager.client().admin().indices().prepareRefresh(new String[]{str}).execute().actionGet();
            }
        }
    }

    private synchronized void flushInternal() {
        if (this.bulkRequest == null || this.currentBatchItems.get() == 0) {
            return;
        }
        waitToCatchUp(5, 60000);
        flush(this.bulkRequest, Long.valueOf(this.currentBatchItems.get()), Long.valueOf(this.currentBatchBytes.get()));
        this.currentBatchItems.set(0L);
        this.currentBatchBytes.set(0L);
        this.bulkRequest = this.manager.client().prepareBulk();
    }

    private synchronized void waitToCatchUp(int i, int i2) {
        for (int i3 = 0; getBatchesSent() - getBatchesResponded() > i && i3 < i2; i3++) {
            try {
                Thread.yield();
                Thread.sleep(1L);
            } catch (InterruptedException e) {
                LOGGER.warn("Catchup was interrupted.  Data may be lost");
                return;
            }
        }
    }

    private void checkForBackOff() {
        try {
            if (getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                int i = 0;
                while (getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                    int i2 = i;
                    i++;
                    if (i2 >= 500) {
                        break;
                    } else {
                        Thread.sleep(10L);
                    }
                }
                if (getTotalOutstanding() > WAITING_DOCS_LIMIT) {
                    LOGGER.warn("Even after back-off there are {} items still in queue.", Long.valueOf(getTotalOutstanding()));
                }
            }
        } catch (Exception e) {
            LOGGER.warn("We were broken from our loop: {}", e.getMessage());
        }
    }

    public void add(String str, String str2, String str3, String str4, String str5) {
        add(str, str2, str3, null, null, str4, str5);
    }

    public void add(String str, String str2, String str3, String str4, String str5, String str6, String str7) {
        Objects.requireNonNull(str);
        Objects.requireNonNull(str2);
        Objects.requireNonNull(str7);
        IndexRequestBuilder source = this.manager.client().prepareIndex(str, str2).setSource(str7);
        if (str3 != null) {
            source.setId(str3);
        }
        if (str6 != null) {
            source.setTimestamp(str6);
        }
        if (str4 != null) {
            source.setParent(str4);
        }
        if (str5 != null) {
            source.setRouting(str5);
        }
        add((IndexRequest) source.request());
    }

    protected void add(IndexRequest indexRequest) {
        Objects.requireNonNull(indexRequest);
        Objects.requireNonNull(indexRequest.index());
        synchronized (this) {
            checkIndexImplications(indexRequest.index());
            this.bulkRequest.add(indexRequest);
            this.currentBatchBytes.addAndGet(indexRequest.source().length());
            this.currentBatchItems.incrementAndGet();
            checkForFlush();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkForFlush() {
        synchronized (this) {
            if (this.currentBatchBytes.get() >= this.flushThresholdBytes || this.currentBatchItems.get() >= this.flushThresholdsRecords || new Date().getTime() - this.lastFlush >= this.flushThresholdTime) {
                flushInternal();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkIndexImplications(String str) {
        synchronized (ElasticsearchPersistWriter.class) {
            if (this.affectedIndexes.contains(str)) {
                return;
            }
            createIndexIfMissing(str);
            this.affectedIndexes.add(str);
        }
    }

    protected void disableRefresh() {
        Iterator<String> it = this.affectedIndexes.iterator();
        while (it.hasNext()) {
            UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(new String[]{it.next()});
            updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", -1));
            this.manager.client().admin().indices().updateSettings(updateSettingsRequest).actionGet();
        }
    }

    public void createIndexIfMissing(String str) {
        if (((IndicesExistsResponse) this.manager.client().admin().indices().exists(new IndicesExistsRequest(new String[]{str})).actionGet()).isExists()) {
            return;
        }
        CreateIndexResponse createIndexResponse = (CreateIndexResponse) this.manager.client().admin().indices().create(new CreateIndexRequest(str)).actionGet();
        if (createIndexResponse.isAcknowledged()) {
            LOGGER.info("Index Created: {}", str);
        } else {
            LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", str);
            LOGGER.error("Error Message: {}", createIndexResponse.toString());
            throw new RuntimeException("Unable to create index " + str);
        }
    }

    public void prepare(Object obj) {
        this.veryLargeBulk = (this.config.getBulk() == null ? Boolean.FALSE : this.config.getBulk()).booleanValue();
        this.flushThresholdsRecords = this.config.getBatchSize() == null ? 100L : (int) this.config.getBatchSize().longValue();
        this.flushThresholdTime = (this.config.getMaxTimeBetweenFlushMs() == null || this.config.getMaxTimeBetweenFlushMs().longValue() <= 0) ? DEFAULT_MAX_WAIT : this.config.getMaxTimeBetweenFlushMs().longValue();
        this.flushThresholdBytes = (this.config.getBatchBytes() == null ? DEFAULT_BULK_FLUSH_THRESHOLD : this.config.getBatchBytes()).longValue();
        this.timer.scheduleAtFixedRate(new TimerTask() { // from class: org.apache.streams.elasticsearch.ElasticsearchPersistWriter.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ElasticsearchPersistWriter.this.checkForFlush();
            }
        }, this.flushThresholdTime, this.flushThresholdTime);
        if (this.veryLargeBulk) {
            disableRefresh();
        }
    }

    private void flush(BulkRequestBuilder bulkRequestBuilder, final Long l, final Long l2) {
        LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", l, MEGABYTE_FORMAT.format(l2.longValue() / 1048576.0d));
        this.lastFlush = new Date().getTime();
        this.totalSent.addAndGet(l.longValue());
        this.batchesSent.incrementAndGet();
        try {
            bulkRequestBuilder.execute().addListener(new ActionListener<BulkResponse>() { // from class: org.apache.streams.elasticsearch.ElasticsearchPersistWriter.2
                public void onResponse(BulkResponse bulkResponse) {
                    ElasticsearchPersistWriter.this.batchesResponded.incrementAndGet();
                    ElasticsearchPersistWriter.this.updateTotals(bulkResponse, l, l2);
                }

                public void onFailure(Throwable th) {
                    ElasticsearchPersistWriter.this.batchesResponded.incrementAndGet();
                    th.printStackTrace();
                }
            });
        } catch (Throwable th) {
            LOGGER.error("There was an error sending the batch: {}", th.getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateTotals(BulkResponse bulkResponse, Long l, Long l2) {
        long j = 0;
        long j2 = 0;
        long tookInMillis = bulkResponse.getTookInMillis();
        for (BulkItemResponse bulkItemResponse : bulkResponse.getItems()) {
            if (bulkItemResponse == null || bulkItemResponse.isFailed()) {
                j++;
                LOGGER.debug("{} ({},{},{}) failed: {}", new Object[]{bulkItemResponse.getOpType(), bulkItemResponse.getIndex(), bulkItemResponse.getType(), bulkItemResponse.getId(), bulkItemResponse.getFailureMessage()});
            } else {
                j2++;
            }
        }
        if (j > 0) {
            LOGGER.warn("Bulk Uploading had {} failures of {}", Long.valueOf(j), l);
        }
        this.totalOk.addAndGet(j2);
        this.totalFailed.addAndGet(j);
        this.totalSeconds.addAndGet(tookInMillis / 1000);
        this.totalSizeInBytes.addAndGet(l2.longValue());
        if (l.longValue() != j2 + j) {
            LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", new Object[]{l, Long.valueOf(j2), Long.valueOf(j)});
        }
        LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", new Object[]{MEGABYTE_FORMAT.format(l2.longValue() / 1048576.0d), NUMBER_FORMAT.format(j2), NUMBER_FORMAT.format(j), NUMBER_FORMAT.format(tookInMillis), MEGABYTE_FORMAT.format(this.totalSizeInBytes.get() / 1048576.0d), NUMBER_FORMAT.format(this.totalOk), NUMBER_FORMAT.format(this.totalFailed), NUMBER_FORMAT.format(this.totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())});
    }
}
