package org.opennms.nephron.cortex;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.JsonFormat;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.avro.file.DataFileConstants;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.util.ExceptionUtils;
import org.joda.time.Instant;
import org.opennms.nephron.cortex.EventTimestampIndexer;
import org.opennms.nephron.cortex.Heap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xerial.snappy.Snappy;
import prometheus.PrometheusRemote;
import prometheus.PrometheusTypes;

@Experimental(Experimental.Kind.SOURCE_SINK)
/* loaded from: input_file:org/opennms/nephron/cortex/CortexIo.class */
public class CortexIo {
    private static final String METRIC_NAME_LABEL = "__name__";
    private static final String X_SCOPE_ORG_ID_HEADER = "X-Scope-OrgID";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CortexIo.class);
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit(LOG).maxRate(5).every(Duration.ofSeconds(10)).build();
    private static final Logger LOG_WRITE = LoggerFactory.getLogger(CortexIo.class.getName() + ".write");
    private static final MediaType PROTOBUF_MEDIA_TYPE = MediaType.parse("application/x-protobuf");
    public static final String CORTEX_METRIC_NAMESPACE = "cortex";
    public static final String CORTEX_WRITE_FAILURE_METRIC_NAME = "write_failure";
    private static Counter WRITE_FAILURE = Metrics.counter(CORTEX_METRIC_NAMESPACE, CORTEX_WRITE_FAILURE_METRIC_NAME);
    public static final String CORTEX_RESPONSE_FAILURE_METRIC_NAME = "response_failure";
    private static Counter RESPONSE_FAILURE = Metrics.counter(CORTEX_METRIC_NAMESPACE, CORTEX_RESPONSE_FAILURE_METRIC_NAME);
    public static final String CORTEX_WRITE_METRIC_NAME = "write";
    private static Counter WRITE = Metrics.counter(CORTEX_METRIC_NAMESPACE, CORTEX_WRITE_METRIC_NAME);
    public static final String CORTEX_SAMPLE_METRIC_NAME = "sample";
    private static Counter SAMPLE = Metrics.counter(CORTEX_METRIC_NAMESPACE, CORTEX_SAMPLE_METRIC_NAME);
    private static String[] KNOWN_CORTEX_ERRORS = {"out of order sample", "duplicate sample for timestamp", "per-metric series limit", "per-metric metadata limit", "per-user series limit", "per-user metric metadata limit"};
    private static final JsonFormat.Printer PROTOBUF_JSON_PRINTER = JsonFormat.printer().omittingInsignificantWhitespace();

    @FunctionalInterface
    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$BuildTimeSeries.class */
    public interface BuildTimeSeries<K, V> extends Serializable {
        void accept(K k, V v, Instant instant, int i, TimeSeriesBuilder timeSeriesBuilder);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$TimeSeriesBuilderImpl.class */
    public static class TimeSeriesBuilderImpl implements TimeSeriesBuilder {
        private final ArrayList<PrometheusTypes.TimeSeries.Builder> builders = new ArrayList<>();
        private Set<Map.Entry<String, String>> fixedLabels;
        private PrometheusTypes.TimeSeries.Builder builder;

        private TimeSeriesBuilderImpl(Set<Map.Entry<String, String>> set) {
            this.fixedLabels = set;
        }

        private PrometheusTypes.TimeSeries.Builder builder() {
            if (this.builder == null) {
                this.builder = PrometheusTypes.TimeSeries.newBuilder();
                for (Map.Entry<String, String> entry : this.fixedLabels) {
                    this.builder.addLabels(PrometheusTypes.Label.newBuilder().setName(entry.getKey()).setValue(entry.getValue()));
                }
                this.builders.add(this.builder);
            }
            return this.builder;
        }

        @Override // org.opennms.nephron.cortex.TimeSeriesBuilder
        public TimeSeriesBuilder setMetricName(String str) {
            return addLabel(CortexIo.METRIC_NAME_LABEL, str);
        }

        @Override // org.opennms.nephron.cortex.TimeSeriesBuilder
        public TimeSeriesBuilder addLabel(String str, String str2) {
            CortexIo.sanitize(str, str2, (str3, str4) -> {
                builder().addLabels(PrometheusTypes.Label.newBuilder().setName(str3).setValue(str4));
            });
            return this;
        }

        @Override // org.opennms.nephron.cortex.TimeSeriesBuilder
        public TimeSeriesBuilder addSample(long j, double d) {
            CortexIo.SAMPLE.inc();
            builder().addSamples(PrometheusTypes.Sample.newBuilder().setTimestamp(j).setValue(d));
            return this;
        }

        @Override // org.opennms.nephron.cortex.TimeSeriesBuilder
        public TimeSeriesBuilder nextSeries() {
            this.builder = null;
            return this;
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$Write.class */
    public static abstract class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
        protected final String writeUrl;
        protected final BuildTimeSeries<K, V> build;
        protected String orgId;
        protected long maxBatchSize;
        protected long maxBatchBytes;
        protected long readTimeoutMs;
        protected long writeTimeoutMs;
        protected final Map<String, String> fixedLabels;
        protected final List<BiConsumer<Call, Response>> responseHandlers;
        protected final List<BiConsumer<Call, Exception>> failureHandlers;

        public Write(String str, BuildTimeSeries<K, V> buildTimeSeries) {
            super("CortexWrite");
            this.maxBatchSize = 10000L;
            this.maxBatchBytes = 524288L;
            this.readTimeoutMs = 10000L;
            this.writeTimeoutMs = 10000L;
            this.fixedLabels = new HashMap();
            this.responseHandlers = new ArrayList();
            this.failureHandlers = new ArrayList();
            this.writeUrl = str;
            this.build = buildTimeSeries;
        }

        public Write<K, V> withOrgId(String str) {
            this.orgId = str;
            return this;
        }

        public Write<K, V> withMaxBatchSize(long j) {
            this.maxBatchSize = j;
            return this;
        }

        public Write<K, V> withMaxBatchBytes(long j) {
            this.maxBatchBytes = j;
            return this;
        }

        public Write<K, V> withReadTimeoutMs(long j) {
            this.readTimeoutMs = j;
            return this;
        }

        public Write<K, V> withWriteTimeoutMs(long j) {
            this.writeTimeoutMs = j;
            return this;
        }

        public Write<K, V> withFixedLabel(String str, String str2) {
            Map<String, String> map = this.fixedLabels;
            Objects.requireNonNull(map);
            CortexIo.sanitize(str, str2, (v1, v2) -> {
                r2.put(v1, v2);
            });
            return this;
        }

        public Write<K, V> withMetricName(String str) {
            return withFixedLabel(CortexIo.METRIC_NAME_LABEL, str);
        }

        public Write<K, V> withResponseHandler(BiConsumer<Call, Response> biConsumer) {
            this.responseHandlers.add(biConsumer);
            return this;
        }

        public Write<K, V> withFailureHandler(BiConsumer<Call, Exception> biConsumer) {
            this.failureHandlers.add(biConsumer);
            return this;
        }

        @VisibleForTesting
        abstract WriteFn<K, V, ?> createWriteFn(org.joda.time.Duration duration);

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PDone expand(PCollection<KV<K, V>> pCollection) {
            ((PCollection) pCollection.apply(Window.into(new GlobalWindows()))).apply(ParDo.of(createWriteFn(pCollection.getWindowingStrategy().getAllowedLateness())));
            return PDone.in(pCollection.getPipeline());
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$WriteFn.class */
    public static abstract class WriteFn<K, V, W extends Write<K, V>> extends DoFn<KV<K, V>, Void> {
        protected final W spec;
        protected final org.joda.time.Duration allowedLateness;
        private transient OkHttpClient okHttpClient;
        private transient PrometheusRemote.WriteRequest.Builder writeRequestBuilder;
        private transient long batchSize;
        private transient long batchBytes;
        private transient AtomicLong writes;
        private transient AtomicLong writeFailures;
        private transient AtomicLong responseFailures;
        private transient Map<String, Pair<AtomicLong, Counter>> detailedResponseFailures;
        private transient Phaser phaser;

        public WriteFn(W w, org.joda.time.Duration duration) {
            this.spec = w;
            this.allowedLateness = duration;
            CortexIo.LOG.debug("allowed lateness: {}", duration);
        }

        protected void setGcTimer(Timer timer, Instant instant) {
            timer.set(instant.plus(this.allowedLateness).plus(1L));
        }

        @DoFn.Setup
        public void setup() {
            CortexIo.LOG.debug("setup - instance: {}", this);
            ConnectionPool connectionPool = new ConnectionPool(1, 5L, TimeUnit.MINUTES);
            Dispatcher dispatcher = new Dispatcher();
            dispatcher.setMaxRequests(1);
            dispatcher.setMaxRequestsPerHost(1);
            this.okHttpClient = new OkHttpClient.Builder().readTimeout(this.spec.readTimeoutMs, TimeUnit.MILLISECONDS).writeTimeout(this.spec.writeTimeoutMs, TimeUnit.MILLISECONDS).dispatcher(dispatcher).connectionPool(connectionPool).build();
            this.phaser = new Phaser(1);
            this.writes = new AtomicLong();
            this.writeFailures = new AtomicLong();
            this.responseFailures = new AtomicLong();
            this.detailedResponseFailures = new HashMap();
        }

        private static void incrementCounter(AtomicLong atomicLong, Counter counter) {
            long andSet = atomicLong.getAndSet(0L);
            if (andSet != 0) {
                if (CortexIo.LOG.isTraceEnabled()) {
                    CortexIo.LOG.trace("increment counter - name: {};  count: {}", counter.getName(), Long.valueOf(andSet));
                }
                counter.inc(andSet);
            }
        }

        @DoFn.StartBundle
        public void startBundle() {
            CortexIo.LOG.debug("startBundle - instance: {}", this);
            startBatch();
        }

        @DoFn.FinishBundle
        public void finishBundle() throws IOException, InterruptedException {
            CortexIo.LOG.debug("finishBundle - instance: {}", this);
            flushBatch();
            this.phaser.arriveAndAwaitAdvance();
            incrementCounter(this.writes, CortexIo.WRITE);
            incrementCounter(this.writeFailures, CortexIo.WRITE_FAILURE);
            incrementCounter(this.responseFailures, CortexIo.RESPONSE_FAILURE);
            synchronized (this.detailedResponseFailures) {
                for (Pair<AtomicLong, Counter> pair : this.detailedResponseFailures.values()) {
                    incrementCounter(pair.getLeft(), pair.getRight());
                }
            }
        }

        @DoFn.Teardown
        public void closeClient() throws IOException {
            CortexIo.LOG.debug("teardown - instance: {}", this);
            this.okHttpClient.dispatcher().executorService().shutdown();
            this.okHttpClient.connectionPool().evictAll();
        }

        protected void outputTimeSeries(Consumer<TimeSeriesBuilder> consumer) throws IOException {
            TimeSeriesBuilderImpl timeSeriesBuilderImpl = new TimeSeriesBuilderImpl(this.spec.fixedLabels.entrySet());
            consumer.accept(timeSeriesBuilderImpl);
            Iterator<PrometheusTypes.TimeSeries.Builder> it2 = timeSeriesBuilderImpl.builders.iterator();
            while (it2.hasNext()) {
                PrometheusTypes.TimeSeries build = it2.next().build();
                int serializedSize = build.getSerializedSize();
                if (this.batchSize >= this.spec.maxBatchSize || this.batchBytes + serializedSize > this.spec.maxBatchBytes) {
                    flushBatch();
                    startBatch();
                }
                this.batchSize++;
                this.batchBytes += serializedSize;
                this.writeRequestBuilder.addTimeseries(build);
            }
        }

        public void startBatch() {
            this.writeRequestBuilder = PrometheusRemote.WriteRequest.newBuilder();
            this.batchSize = 0L;
            this.batchBytes = 0L;
        }

        private void flushBatch() throws IOException {
            CortexIo.LOG.trace("flushBatch - instance: {}; batchSize: {}", this, Long.valueOf(this.batchSize));
            if (this.batchSize == 0) {
                return;
            }
            final PrometheusRemote.WriteRequest build = this.writeRequestBuilder.build();
            Request.Builder post = new Request.Builder().url(this.spec.writeUrl).addHeader("X-Prometheus-Remote-Write-Version", "0.1.0").addHeader("Content-Encoding", DataFileConstants.SNAPPY_CODEC).addHeader("User-Agent", CortexIo.class.getCanonicalName()).post(RequestBody.create(CortexIo.PROTOBUF_MEDIA_TYPE, Snappy.compress(build.toByteArray())));
            if (this.spec.orgId != null) {
                post.addHeader(CortexIo.X_SCOPE_ORG_ID_HEADER, this.spec.orgId);
            }
            Request build2 = post.build();
            if (CortexIo.LOG_WRITE.isTraceEnabled()) {
                String print = CortexIo.PROTOBUF_JSON_PRINTER.print(build);
                CortexIo.LOG_WRITE.trace((String) Pattern.compile(",\\{\"labels\"").splitAsStream(print.substring(print.indexOf(91) + 1, print.length() - 2)).filter(charSequence -> {
                    return StringUtils.isNoneBlank(charSequence);
                }).collect(Collectors.joining("\n" + "{\"index\":{}}" + "\n{\"labels\"", "{\"index\":{}}" + "\n", "")));
            }
            this.phaser.register();
            this.writes.incrementAndGet();
            this.okHttpClient.newCall(build2).enqueue(new Callback() { // from class: org.opennms.nephron.cortex.CortexIo.WriteFn.1
                @Override // okhttp3.Callback
                public void onFailure(Call call, IOException iOException) {
                    try {
                        CortexIo.RATE_LIMITED_LOG.error("Write to Cortex failed", (Throwable) iOException);
                        WriteFn.this.writeFailures.incrementAndGet();
                        WriteFn.this.spec.failureHandlers.forEach(biConsumer -> {
                            biConsumer.accept(call, iOException);
                        });
                    } finally {
                        WriteFn.this.phaser.arriveAndDeregister();
                    }
                }

                @Override // okhttp3.Callback
                public void onResponse(Call call, Response response) {
                    try {
                        if (CortexIo.LOG.isTraceEnabled()) {
                            CortexIo.LOG.trace("got response - code: {}, writeRequest: {}", Integer.valueOf(response.code()), build);
                        }
                        if (!response.isSuccessful()) {
                            WriteFn.this.responseFailures.incrementAndGet();
                            handleUnsuccessfulResponse(response);
                        }
                        WriteFn.this.spec.responseHandlers.forEach(biConsumer -> {
                            biConsumer.accept(call, response);
                        });
                    } finally {
                        WriteFn.this.phaser.arriveAndDeregister();
                    }
                }

                private void handleUnsuccessfulResponse(Response response) {
                    String str;
                    ResponseBody body = response.body();
                    try {
                        if (body != null) {
                            try {
                                str = body.string();
                                incrementDetailedResponseFailure(response.code(), CortexIo.determineErrorKind(str));
                            } catch (IOException e) {
                                str = "(error reading body)";
                                incrementDetailedResponseFailure(response.code(), "body read error");
                            }
                        } else {
                            str = ExceptionUtils.STRINGIFIED_NULL_EXCEPTION;
                            incrementDetailedResponseFailure(response.code(), "body empty");
                        }
                        CortexIo.RATE_LIMITED_LOG.error("Write to Cortex failed - code: " + response.code() + "; message: " + response.message() + "; body: " + str.trim() + "; request: " + build);
                        if (body != null) {
                            body.close();
                        }
                    } catch (Throwable th) {
                        if (body != null) {
                            try {
                                body.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        }
                        throw th;
                    }
                }

                private void incrementDetailedResponseFailure(int i, String str) {
                    Pair<AtomicLong, Counter> pair;
                    String str2 = String.valueOf(i) + " " + str;
                    synchronized (WriteFn.this.detailedResponseFailures) {
                        pair = WriteFn.this.detailedResponseFailures.get(str2);
                        if (pair == null) {
                            pair = Pair.of(new AtomicLong(), Metrics.counter(CortexIo.CORTEX_METRIC_NAMESPACE, "response_failure_" + str2));
                            WriteFn.this.detailedResponseFailures.put(str2, pair);
                        }
                    }
                    pair.getKey().incrementAndGet();
                }
            });
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$WriteFnWithAccumulation.class */
    public static class WriteFnWithAccumulation<K, V> extends WriteFn<K, V, WriteWithAccumulation<K, V>> {
        private static final String OUTPUT_TIMER_NAME = "output";
        private static final String GC_TIMER_NAME = "gc";
        private static final String HEAP_STATE_NAME = "heap";

        @DoFn.StateId(HEAP_STATE_NAME)
        private final StateSpec<ValueState<Heap.HeapImpl<V>>> heapStateSpec;

        @DoFn.TimerId(OUTPUT_TIMER_NAME)
        private final TimerSpec outputTimerSpec;

        @DoFn.TimerId(GC_TIMER_NAME)
        private final TimerSpec gcTimerSpec;

        public WriteFnWithAccumulation(WriteWithAccumulation<K, V> writeWithAccumulation, org.joda.time.Duration duration) {
            super(writeWithAccumulation, duration);
            this.outputTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
            this.gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
            this.heapStateSpec = StateSpecs.value(new Heap.HeapImpl.HeapImplCoder(((WriteWithAccumulation) writeWithAccumulation).valueCoder));
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, Void>.ProcessContext processContext, @DoFn.StateId("heap") @DoFn.AlwaysFetched ValueState<Heap.HeapImpl<V>> valueState, @DoFn.TimerId("output") Timer timer, @DoFn.TimerId("gc") Timer timer2) throws Exception {
            Heap.HeapImpl<V> read = valueState.read();
            if (read == null) {
                CortexIo.LOG.debug("create heap - key: {}", processContext.element().getKey());
                read = new Heap.HeapImpl<>(new EventTimestampIndexer(), new HashMap());
            }
            if (read.isEmpty()) {
                timer.offset(((WriteWithAccumulation) this.spec).accumulationDelay).setRelative();
            }
            if (CortexIo.LOG.isTraceEnabled()) {
                CortexIo.LOG.trace("add value to heap - ctx.timestamp: {}; key: {}; value: {}", processContext.timestamp(), processContext.element().getKey(), processContext.element().getValue());
            }
            read.add(processContext.element().getValue(), processContext.timestamp(), Instant.now(), ((WriteWithAccumulation) this.spec).combiner);
            valueState.write(read);
            setGcTimer(timer2, read.newestEventTimestamp().get());
        }

        @DoFn.OnTimer(OUTPUT_TIMER_NAME)
        public void onOutput(DoFn<KV<K, V>, Void>.OnTimerContext onTimerContext, @DoFn.Key K k, @DoFn.StateId("heap") @DoFn.AlwaysFetched ValueState<Heap.HeapImpl<V>> valueState, @DoFn.TimerId("output") Timer timer) throws IOException {
            Heap.HeapImpl<V> read = valueState.read();
            if (read != null) {
                Instant now = Instant.now();
                if (flushHeap(read, k, now, ((WriteWithAccumulation) this.spec).accumulationDelay, "after accumulation")) {
                    valueState.write(read);
                }
                Optional<Instant> oldestProcessingTimestamp = read.oldestProcessingTimestamp();
                if (oldestProcessingTimestamp.isPresent()) {
                    org.joda.time.Duration duration = new org.joda.time.Duration(oldestProcessingTimestamp.get().plus(((WriteWithAccumulation) this.spec).accumulationDelay), now);
                    CortexIo.LOG.trace("schedule next heap check - duration: {}", duration);
                    timer.offset(duration).setRelative();
                }
            }
        }

        @DoFn.OnTimer(GC_TIMER_NAME)
        public void onGc(@DoFn.Key K k, @DoFn.StateId("heap") @DoFn.AlwaysFetched ValueState<Heap.HeapImpl<V>> valueState) throws IOException {
            Heap.HeapImpl<V> read = valueState.read();
            if (read != null) {
                flushHeap(read, k, Instant.now(), org.joda.time.Duration.ZERO, "on garbage collection");
                valueState.clear();
            }
        }

        private boolean flushHeap(Heap<V> heap, K k, Instant instant, org.joda.time.Duration duration, String str) throws IOException {
            List<Heap.Flushed<V>> flush = heap.flush(instant, duration);
            if (CortexIo.LOG.isTraceEnabled()) {
                CortexIo.LOG.trace("flush values from heap {} - key: {}; size: {}", str, k, Integer.valueOf(flush.size()));
            }
            if (!flush.isEmpty()) {
                for (Heap.Flushed<V> flushed : flush) {
                    outputTimeSeries(timeSeriesBuilder -> {
                        ((WriteWithAccumulation) this.spec).build.accept(k, flushed.value, flushed.eventTimestamp, flushed.index, timeSeriesBuilder);
                    });
                }
            }
            return !flush.isEmpty();
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$WriteFnWithoutAccumulation.class */
    public static class WriteFnWithoutAccumulation<K, V> extends WriteFn<K, V, WriteWithoutAccumulation<K, V>> {
        private static final String GC_TIMER_NAME = "gc";
        private static final String FLUSHED_STATE_NAME = "flushed";

        @DoFn.StateId(FLUSHED_STATE_NAME)
        private final StateSpec<ValueState<EventTimestampIndexer>> flushedStateSpec;

        @DoFn.TimerId(GC_TIMER_NAME)
        private final TimerSpec gcTimerSpec;

        public WriteFnWithoutAccumulation(WriteWithoutAccumulation<K, V> writeWithoutAccumulation, org.joda.time.Duration duration) {
            super(writeWithoutAccumulation, duration);
            this.gcTimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
            this.flushedStateSpec = StateSpecs.value(EventTimestampIndexer.EventTimestampIndexerCoder.of());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<K, V>, Void>.ProcessContext processContext, @DoFn.StateId("flushed") @DoFn.AlwaysFetched ValueState<EventTimestampIndexer> valueState, @DoFn.TimerId("gc") Timer timer) throws Exception {
            EventTimestampIndexer read = valueState.read();
            if (read == null) {
                read = new EventTimestampIndexer();
            }
            int findIndex = read.findIndex(processContext.timestamp());
            valueState.write(read);
            if (CortexIo.LOG.isTraceEnabled()) {
                CortexIo.LOG.trace("add value to heap - ctx.timestamp: {}; key: {}; value: {}", processContext.timestamp(), processContext.element().getKey(), processContext.element().getValue());
            }
            outputTimeSeries(timeSeriesBuilder -> {
                ((WriteWithoutAccumulation) this.spec).build.accept(((KV) processContext.element()).getKey(), ((KV) processContext.element()).getValue(), processContext.timestamp(), findIndex, timeSeriesBuilder);
            });
            setGcTimer(timer, read.newestEventTimestamp());
        }

        @DoFn.OnTimer(GC_TIMER_NAME)
        public void onGc(@DoFn.StateId("flushed") ValueState<EventTimestampIndexer> valueState) throws IOException {
            valueState.clear();
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$WriteWithAccumulation.class */
    private static class WriteWithAccumulation<K, V> extends Write<K, V> {
        private final Coder<K> keyCoder;
        private final Coder<V> valueCoder;
        private final SerializableBiFunction<V, V, V> combiner;
        private final org.joda.time.Duration accumulationDelay;

        public WriteWithAccumulation(String str, BuildTimeSeries<K, V> buildTimeSeries, Coder<K> coder, Coder<V> coder2, SerializableBiFunction<V, V, V> serializableBiFunction, org.joda.time.Duration duration) {
            super(str, buildTimeSeries);
            this.keyCoder = coder;
            this.valueCoder = coder2;
            this.combiner = serializableBiFunction;
            this.accumulationDelay = duration;
        }

        @Override // org.opennms.nephron.cortex.CortexIo.Write
        WriteFn<K, V, ?> createWriteFn(org.joda.time.Duration duration) {
            return new WriteFnWithAccumulation(this, duration);
        }
    }

    /* loaded from: input_file:org/opennms/nephron/cortex/CortexIo$WriteWithoutAccumulation.class */
    private static class WriteWithoutAccumulation<K, V> extends Write<K, V> {
        public WriteWithoutAccumulation(String str, BuildTimeSeries<K, V> buildTimeSeries) {
            super(str, buildTimeSeries);
        }

        @Override // org.opennms.nephron.cortex.CortexIo.Write
        WriteFn<K, V, ?> createWriteFn(org.joda.time.Duration duration) {
            return new WriteFnWithoutAccumulation(this, duration);
        }
    }

    public static <K, V> Write<K, V> of(String str, BuildTimeSeries<K, V> buildTimeSeries) {
        return new WriteWithoutAccumulation(str, buildTimeSeries);
    }

    public static <K, V> Write<K, V> of(String str, BuildTimeSeries<K, V> buildTimeSeries, Coder<K> coder, Coder<V> coder2, SerializableBiFunction<V, V, V> serializableBiFunction, org.joda.time.Duration duration) {
        return new WriteWithAccumulation(str, buildTimeSeries, coder, coder2, serializableBiFunction, duration);
    }

    private static String determineErrorKind(String str) {
        for (String str2 : KNOWN_CORTEX_ERRORS) {
            if (str.contains(str2)) {
                return str2;
            }
        }
        RATE_LIMITED_LOG.warn("could not extract error kind - body: {}", str);
        return "body unknown";
    }

    public static void sanitize(String str, String str2, BiConsumer<String, String> biConsumer) {
        String sanitizeLabelName;
        String str3;
        if (METRIC_NAME_LABEL.equals(str)) {
            sanitizeLabelName = str;
            str3 = sanitizeMetricName(str2);
        } else {
            sanitizeLabelName = sanitizeLabelName(str);
            str3 = str2;
        }
        biConsumer.accept(sanitizeLabelName, str3);
    }

    public static String sanitizeMetricName(String str) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < str.length(); i++) {
            char charAt = str.charAt(i);
            if ((charAt < 'a' || charAt > 'z') && ((charAt < 'A' || charAt > 'Z') && charAt != '_' && charAt != ':' && (charAt < '0' || charAt > '9' || i <= 0))) {
                sb.append("_");
            } else {
                sb.append(charAt);
            }
        }
        return sb.toString();
    }

    public static String sanitizeLabelName(String str) {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < str.length(); i++) {
            char charAt = str.charAt(i);
            if ((charAt < 'a' || charAt > 'z') && ((charAt < 'A' || charAt > 'Z') && charAt != '_' && (charAt < '0' || charAt > '9' || i <= 0))) {
                sb.append("_");
            } else {
                sb.append(charAt);
            }
        }
        return sb.toString();
    }
}
