package org.opennms.nephron;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.net.InetAddresses;
import com.swrve.ratelimitedlogger.RateLimitedLog;
import java.io.FileReader;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.beam.repackaged.core.org.apache.commons.lang3.StringUtils;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO;
import org.apache.beam.sdk.io.kafka.CustomTimestampPolicyWithLimitedDelay;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.io.kafka.TimestampPolicyFactory;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Instant;
import org.opennms.nephron.Aggregate;
import org.opennms.nephron.CompoundKey;
import org.opennms.nephron.CompoundKeyData;
import org.opennms.nephron.coders.FlowDocumentProtobufCoder;
import org.opennms.nephron.coders.KafkaInputFlowDeserializer;
import org.opennms.nephron.cortex.CortexIo;
import org.opennms.nephron.cortex.TimeSeriesBuilder;
import org.opennms.nephron.elastic.AggregationType;
import org.opennms.nephron.elastic.FlowSummary;
import org.opennms.nephron.elastic.IndexStrategy;
import org.opennms.nephron.network.IPAddress;
import org.opennms.nephron.network.IpValue;
import org.opennms.nephron.network.StringValue;
import org.opennms.nephron.util.PaneAccumulator;
import org.opennms.netmgt.flows.persistence.model.Direction;
import org.opennms.netmgt.flows.persistence.model.FlowDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/nephron/Pipeline.class */
public class Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Pipeline.class);
    private static final ObjectMapper MAPPER = new ObjectMapper();
    public static final RateLimitedLog RATE_LIMITED_LOG = RateLimitedLog.withRateLimit(LOG).maxRate(5).every(Duration.ofSeconds(10)).build();
    private static TupleTag<KV<CompoundKey, Aggregate>> BY_HOST = new TupleTag<KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.1
    };
    private static TupleTag<KV<CompoundKey, Aggregate>> BY_APP = new TupleTag<KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.2
    };
    private static ParDo.SingleOutput<KV<CompoundKey, Aggregate>, String> FLOW_SUMMARY_DATA_TO_JSON = ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, String>() { // from class: org.opennms.nephron.Pipeline.3
        @DoFn.ProcessElement
        public void processElement(DoFn<KV<CompoundKey, Aggregate>, String>.ProcessContext processContext, IntervalWindow intervalWindow) throws JsonProcessingException {
            processContext.output(Pipeline.MAPPER.writeValueAsString(Pipeline.toFlowSummary(processContext.element(), intervalWindow)));
        }
    });

    /* loaded from: input_file:org/opennms/nephron/Pipeline$CalculateFlowStatistics.class */
    public static class CalculateFlowStatistics extends PTransform<PCollection<FlowDocument>, PCollection<KV<CompoundKey, Aggregate>>> {
        private final int topK;
        private final PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> windowing;

        public CalculateFlowStatistics(int i, PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> pTransform) {
            this.topK = i;
            this.windowing = pTransform;
        }

        public CalculateFlowStatistics(NephronOptions nephronOptions) {
            this(nephronOptions.getTopK(), new WindowedFlows(nephronOptions));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<KV<CompoundKey, Aggregate>> expand(PCollection<FlowDocument> pCollection) {
            SumsAndTopKs aggregateSumsAndTopKs = Pipeline.aggregateSumsAndTopKs("conv_", (PCollection) ((PCollection) pCollection.apply("WindowedFlows", this.windowing)).apply("key_by_conv", ParDo.of(new KeyByConvWithTos())), CompoundKeyType.EXPORTER_INTERFACE_CONVERSATION, this.topK, compoundKey -> {
                return Boolean.valueOf(compoundKey.isCompleteConversationKey());
            });
            PCollectionTuple pCollectionTuple = (PCollectionTuple) aggregateSumsAndTopKs.withTos.sum.apply("proj_conv", ParDo.of(new ProjConvWithTos()).withOutputTags(Pipeline.BY_APP, TupleTagList.of(Pipeline.BY_HOST)));
            PCollection pCollection2 = pCollectionTuple.get(Pipeline.BY_APP);
            PCollection pCollection3 = pCollectionTuple.get(Pipeline.BY_HOST);
            SumsAndTopKs aggregateSumsAndTopKs2 = Pipeline.aggregateSumsAndTopKs("app_", pCollection2, CompoundKeyType.EXPORTER_INTERFACE_APPLICATION, this.topK, compoundKey2 -> {
                return true;
            });
            SumsAndTopKs aggregateSumsAndTopKs3 = Pipeline.aggregateSumsAndTopKs("host_", pCollection3, CompoundKeyType.EXPORTER_INTERFACE_HOST, this.topK, compoundKey3 -> {
                return true;
            });
            TotalAndSummary aggregateParentTotal = Pipeline.aggregateParentTotal("tos_", aggregateSumsAndTopKs2.withTos.sum);
            return (PCollection) PCollectionList.of(Pipeline.aggregateParentTotal("itf_", aggregateParentTotal.total).summary).and(aggregateParentTotal.summary).and(aggregateSumsAndTopKs2.withTos.topK).and(aggregateSumsAndTopKs2.withoutTos.topK).and(aggregateSumsAndTopKs3.withTos.topK).and(aggregateSumsAndTopKs3.withoutTos.topK).and(aggregateSumsAndTopKs.withTos.topK).and(aggregateSumsAndTopKs.withoutTos.topK).apply(Flatten.pCollections());
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1618413705:
                    if (implMethodName.equals("lambda$expand$4090f396$1")) {
                        z = 2;
                        break;
                    }
                    break;
                case 421429298:
                    if (implMethodName.equals("lambda$expand$c845d88f$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 924568325:
                    if (implMethodName.equals("lambda$expand$40d77ba2$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline$CalculateFlowStatistics") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/CompoundKey;)Ljava/lang/Boolean;")) {
                        return compoundKey2 -> {
                            return true;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline$CalculateFlowStatistics") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/CompoundKey;)Ljava/lang/Boolean;")) {
                        return compoundKey3 -> {
                            return true;
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline$CalculateFlowStatistics") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/CompoundKey;)Ljava/lang/Boolean;")) {
                        return compoundKey -> {
                            return Boolean.valueOf(compoundKey.isCompleteConversationKey());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opennms/nephron/Pipeline$FlowBytesValueComparator.class */
    public static class FlowBytesValueComparator implements Comparator<KV<CompoundKey, Aggregate>>, Serializable {
        FlowBytesValueComparator() {
        }

        @Override // java.util.Comparator
        public int compare(KV<CompoundKey, Aggregate> kv, KV<CompoundKey, Aggregate> kv2) {
            int compare = Long.compare(kv.getValue().getBytes(), kv2.getValue().getBytes());
            return compare != 0 ? compare : kv2.getKey().groupedByKey().compareTo(kv.getKey().groupedByKey());
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$KeyByConvWithTos.class */
    public static class KeyByConvWithTos extends DoFn<FlowDocument, KV<CompoundKey, Aggregate>> {
        private final Counter flowsWithMissingFields = Metrics.counter((Class<?>) Pipeline.class, "flowsWithMissingFields");
        private final Counter flowsInWindow = Metrics.counter("flows", "in_window");

        @DoFn.ProcessElement
        public void processElement(DoFn<FlowDocument, KV<CompoundKey, Aggregate>>.ProcessContext processContext, IntervalWindow intervalWindow) {
            String srcHostname;
            String dstHostname;
            FlowDocument element = processContext.element();
            try {
                CompoundKey create = CompoundKeyType.EXPORTER_INTERFACE_TOS_CONVERSATION.create(element);
                if (Strings.nullToEmpty(element.getSrcAddress()).compareTo(Strings.nullToEmpty(element.getDstAddress())) < 0) {
                    dstHostname = element.getSrcHostname();
                    srcHostname = element.getDstHostname();
                } else {
                    srcHostname = element.getSrcHostname();
                    dstHostname = element.getDstHostname();
                }
                Aggregate aggregatize = Pipeline.aggregatize(intervalWindow, element, dstHostname, srcHostname);
                this.flowsInWindow.inc();
                processContext.output(KV.of(create, aggregatize));
            } catch (MissingFieldsException e) {
                this.flowsWithMissingFields.inc();
            }
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$ProjConvWithTos.class */
    public static class ProjConvWithTos extends DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>> {
        @DoFn.ProcessElement
        public void processElement(@DoFn.Element KV<CompoundKey, Aggregate> kv, DoFn.MultiOutputReceiver multiOutputReceiver) {
            CompoundKey key = kv.getKey();
            Aggregate value = kv.getValue();
            multiOutputReceiver.get(Pipeline.BY_APP).output(KV.of(key.cast(CompoundKeyType.EXPORTER_INTERFACE_TOS_APPLICATION), value.withHostname(null)));
            multiOutputReceiver.get(Pipeline.BY_HOST).output(KV.of(key.cast(CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST), value.withHostname(value.getHostname())));
            multiOutputReceiver.get(Pipeline.BY_HOST).output(KV.of(new CompoundKey(CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST, new CompoundKeyData.Builder(key.data).withAddress(key.data.largerAddress).build()), value.withHostname(value.getHostname2())));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$ReadFromKafka.class */
    public static class ReadFromKafka extends PTransform<PBegin, PCollection<FlowDocument>> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaConsumerConfig;
        private final Counter flowsFromKafka = Metrics.counter("flows", "from_kafka");
        private final Gauge flowsFromKafkaDrift = Metrics.gauge("flows", "from_kafka_drift");
        private final TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory;

        public ReadFromKafka(String str, String str2, Map<String, Object> map, TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory) {
            this.bootstrapServers = (String) Objects.requireNonNull(str);
            this.topic = (String) Objects.requireNonNull(str2);
            this.kafkaConsumerConfig = (Map) Objects.requireNonNull(map);
            this.timestampPolicyFactory = timestampPolicyFactory;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<FlowDocument> expand(PBegin pBegin) {
            return (PCollection) ((PCollection) pBegin.apply(KafkaIO.read().withTopic(this.topic).withKeyDeserializer(ByteArrayDeserializer.class).withValueDeserializer(KafkaInputFlowDeserializer.class).withConsumerConfigUpdates(this.kafkaConsumerConfig).withBootstrapServers(this.bootstrapServers).withTimestampPolicyFactory(this.timestampPolicyFactory).withoutMetadata())).apply("init", ParDo.of(new DoFn<KV<byte[], FlowDocument>, FlowDocument>() { // from class: org.opennms.nephron.Pipeline.ReadFromKafka.1
                @DoFn.ProcessElement
                public void processElement(DoFn<KV<byte[], FlowDocument>, FlowDocument>.ProcessContext processContext) {
                    FlowDocument value = processContext.element().getValue();
                    if (!value.hasDeltaSwitched()) {
                        value = FlowDocument.newBuilder(value).setDeltaSwitched(value.getFirstSwitched()).build();
                    }
                    processContext.output(value);
                    ReadFromKafka.this.flowsFromKafka.inc();
                    ReadFromKafka.this.flowsFromKafkaDrift.set(System.currentTimeMillis() - value.getLastSwitched().getValue());
                }
            }));
        }

        public static long getTimestampMs(FlowDocument flowDocument) {
            return flowDocument.getLastSwitched().getValue();
        }

        public static Instant getTimestamp(KafkaRecord<byte[], FlowDocument> kafkaRecord) {
            return getTimestamp(kafkaRecord.getKV().getValue());
        }

        public static Instant getTimestamp(FlowDocument flowDocument) {
            return Instant.ofEpochMilli(getTimestampMs(flowDocument));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumAndTopK.class */
    public static class SumAndTopK {
        public final PCollection<KV<CompoundKey, Aggregate>> sum;
        public final PCollection<KV<CompoundKey, Aggregate>> topK;

        public SumAndTopK(PCollection<KV<CompoundKey, Aggregate>> pCollection, PCollection<KV<CompoundKey, Aggregate>> pCollection2) {
            this.sum = pCollection;
            this.topK = pCollection2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumBytes.class */
    public static class SumBytes extends Combine.BinaryCombineFn<Aggregate> {
        SumBytes() {
        }

        @Override // org.apache.beam.sdk.transforms.Combine.BinaryCombineFn
        public Aggregate apply(Aggregate aggregate, Aggregate aggregate2) {
            return Aggregate.merge(aggregate, aggregate2);
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$SumsAndTopKs.class */
    public static class SumsAndTopKs {
        public final SumAndTopK withTos;
        public final SumAndTopK withoutTos;

        public SumsAndTopKs(SumAndTopK sumAndTopK, SumAndTopK sumAndTopK2) {
            this.withTos = sumAndTopK;
            this.withoutTos = sumAndTopK2;
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$TotalAndSummary.class */
    public static class TotalAndSummary {
        public final PCollection<KV<CompoundKey, Aggregate>> total;
        public final PCollection<KV<CompoundKey, Aggregate>> summary;

        public TotalAndSummary(PCollection<KV<CompoundKey, Aggregate>> pCollection, PCollection<KV<CompoundKey, Aggregate>> pCollection2) {
            this.total = pCollection;
            this.summary = pCollection2;
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WindowedFlows.class */
    public static class WindowedFlows extends PTransform<PCollection<FlowDocument>, PCollection<FlowDocument>> {
        private final org.joda.time.Duration fixedWindowSize;
        private final org.joda.time.Duration maxFlowDuration;
        private final org.joda.time.Duration earlyProcessingDelay;
        private final org.joda.time.Duration lateProcessingDelay;
        private final org.joda.time.Duration allowedLateness;

        public WindowedFlows(org.joda.time.Duration duration, org.joda.time.Duration duration2, org.joda.time.Duration duration3, org.joda.time.Duration duration4, org.joda.time.Duration duration5) {
            this.fixedWindowSize = (org.joda.time.Duration) Objects.requireNonNull(duration);
            this.maxFlowDuration = (org.joda.time.Duration) Objects.requireNonNull(duration2);
            this.earlyProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration3);
            this.lateProcessingDelay = (org.joda.time.Duration) Objects.requireNonNull(duration4);
            this.allowedLateness = (org.joda.time.Duration) Objects.requireNonNull(duration5);
        }

        public WindowedFlows(NephronOptions nephronOptions) {
            this(org.joda.time.Duration.millis(nephronOptions.getFixedWindowSizeMs()), org.joda.time.Duration.millis(nephronOptions.getMaxFlowDurationMs()), org.joda.time.Duration.millis(nephronOptions.getEarlyProcessingDelayMs()), org.joda.time.Duration.millis(nephronOptions.getLateProcessingDelayMs()), org.joda.time.Duration.millis(nephronOptions.getAllowedLatenessMs()));
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PCollection<FlowDocument> expand(PCollection<FlowDocument> pCollection) {
            return (PCollection) ((PCollection) pCollection.apply("attach_timestamp", Pipeline.attachTimestamps(this.fixedWindowSize, this.maxFlowDuration))).apply("to_windows", Pipeline.toWindow(this.fixedWindowSize, this.earlyProcessingDelay, this.lateProcessingDelay, this.allowedLateness));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WriteToElasticsearch.class */
    public static class WriteToElasticsearch extends PTransform<PCollection<KV<CompoundKey, Aggregate>>, PDone> {
        private final String elasticIndex;
        private final IndexStrategy indexStrategy;
        private final ElasticsearchIO.ConnectionConfiguration esConfig;
        private final Counter flowsToEs;
        private final Gauge flowsToEsDrift;
        private int elasticRetryCount;
        private long elasticRetryDuration;

        public WriteToElasticsearch(String str, String str2, String str3, String str4, IndexStrategy indexStrategy, int i, int i2, int i3, long j) {
            this.flowsToEs = Metrics.counter("flows", "to_es");
            this.flowsToEsDrift = Metrics.gauge("flows", "to_es_drift");
            Objects.requireNonNull(str);
            this.elasticIndex = (String) Objects.requireNonNull(str4);
            this.indexStrategy = (IndexStrategy) Objects.requireNonNull(indexStrategy);
            ElasticsearchIO.ConnectionConfiguration create = ElasticsearchIO.ConnectionConfiguration.create(new String[]{str}, str4, "_doc");
            if (!Strings.isNullOrEmpty(str2) && !Strings.isNullOrEmpty(str3)) {
                create = create.withUsername(str2).withPassword(str3);
            }
            this.esConfig = create.withConnectTimeout(Integer.valueOf(i)).withSocketTimeout(Integer.valueOf(i2));
            this.elasticRetryCount = i3;
            this.elasticRetryDuration = j;
        }

        public WriteToElasticsearch(NephronOptions nephronOptions) {
            this(nephronOptions.getElasticUrl(), nephronOptions.getElasticUser(), nephronOptions.getElasticPassword(), nephronOptions.getElasticFlowIndex(), nephronOptions.getElasticIndexStrategy(), nephronOptions.getElasticConnectTimeout(), nephronOptions.getElasticSocketTimeout(), nephronOptions.getElasticRetryCount(), nephronOptions.getElasticRetryDuration());
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PDone expand(PCollection<KV<CompoundKey, Aggregate>> pCollection) {
            return (PDone) ((PCollection) pCollection.apply("SerializeToJson", Pipeline.FLOW_SUMMARY_DATA_TO_JSON)).apply("WriteToElasticsearch", ElasticsearchIO.write().withConnectionConfiguration(this.esConfig).withRetryConfiguration(ElasticsearchIO.RetryConfiguration.create(this.elasticRetryCount, org.joda.time.Duration.millis(this.elasticRetryDuration))).withIndexFn(new ElasticsearchIO.Write.FieldValueExtractFn() { // from class: org.opennms.nephron.Pipeline.WriteToElasticsearch.1
                @Override // org.apache.beam.sdk.transforms.SerializableFunction, org.apache.beam.sdk.transforms.ProcessFunction
                public String apply(JsonNode jsonNode) {
                    java.time.Instant ofEpochMilli = java.time.Instant.ofEpochMilli(jsonNode.get("@timestamp").asLong());
                    String index = WriteToElasticsearch.this.indexStrategy.getIndex(WriteToElasticsearch.this.elasticIndex, ofEpochMilli);
                    WriteToElasticsearch.this.flowsToEs.inc();
                    WriteToElasticsearch.this.flowsToEsDrift.set(System.currentTimeMillis() - ofEpochMilli.toEpochMilli());
                    return index;
                }
            }));
        }
    }

    /* loaded from: input_file:org/opennms/nephron/Pipeline$WriteToKafka.class */
    public static class WriteToKafka extends PTransform<PCollection<KV<CompoundKey, Aggregate>>, PDone> {
        private final String bootstrapServers;
        private final String topic;
        private final Map<String, Object> kafkaProducerConfig;

        public WriteToKafka(String str, String str2, Map<String, Object> map) {
            this.bootstrapServers = (String) Objects.requireNonNull(str);
            this.topic = (String) Objects.requireNonNull(str2);
            this.kafkaProducerConfig = map;
        }

        @Override // org.apache.beam.sdk.transforms.PTransform
        public PDone expand(PCollection<KV<CompoundKey, Aggregate>> pCollection) {
            return (PDone) ((PCollection) pCollection.apply(Pipeline.FLOW_SUMMARY_DATA_TO_JSON)).apply(KafkaIO.write().withProducerConfigUpdates(this.kafkaProducerConfig).withBootstrapServers(this.bootstrapServers).withTopic(this.topic).withValueSerializer(StringSerializer.class).values());
        }
    }

    public static org.apache.beam.sdk.Pipeline create(NephronOptions nephronOptions) {
        Objects.requireNonNull(nephronOptions);
        return create(nephronOptions, getKafkaInputTimestampPolicyFactory(org.joda.time.Duration.millis(nephronOptions.getDefaultMaxInputDelayMs())));
    }

    public static org.apache.beam.sdk.Pipeline create(NephronOptions nephronOptions, TimestampPolicyFactory<byte[], FlowDocument> timestampPolicyFactory) {
        Objects.requireNonNull(nephronOptions);
        org.apache.beam.sdk.Pipeline create = org.apache.beam.sdk.Pipeline.create(nephronOptions);
        registerCoders(create);
        Map<String, Object> loadKafkaClientProperties = loadKafkaClientProperties(nephronOptions);
        loadKafkaClientProperties.put("group.id", nephronOptions.getGroupId());
        loadKafkaClientProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.valueOf(nephronOptions.getAutoCommit()));
        PCollection<KV<CompoundKey, Aggregate>> accumulateSummariesIfNecessary = accumulateSummariesIfNecessary(nephronOptions, (PCollection) ((PCollection) create.apply(new ReadFromKafka(nephronOptions.getBootstrapServers(), nephronOptions.getFlowSourceTopic(), loadKafkaClientProperties, timestampPolicyFactory))).apply(new CalculateFlowStatistics(nephronOptions)));
        attachWriteToElastic(nephronOptions, accumulateSummariesIfNecessary);
        attachWriteToKafka(nephronOptions, accumulateSummariesIfNecessary);
        attachWriteToCortex(nephronOptions, accumulateSummariesIfNecessary);
        return create;
    }

    public static PCollection<KV<CompoundKey, Aggregate>> accumulateSummariesIfNecessary(NephronOptions nephronOptions, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        return nephronOptions.getSummaryAccumulationDelayMs() != 0 ? accumulateFlowSummaries(pCollection, org.joda.time.Duration.millis(nephronOptions.getSummaryAccumulationDelayMs())) : pCollection;
    }

    public static PCollection<KV<CompoundKey, Aggregate>> accumulateFlowSummaries(PCollection<KV<CompoundKey, Aggregate>> pCollection, org.joda.time.Duration duration) {
        return (PCollection) pCollection.apply(new PaneAccumulator(Aggregate::merge, duration, new CompoundKey.CompoundKeyCoder(), new Aggregate.AggregateCoder()));
    }

    private static Map<String, Object> loadKafkaClientProperties(NephronOptions nephronOptions) {
        HashMap hashMap = new HashMap();
        if (!Strings.isNullOrEmpty(nephronOptions.getKafkaClientProperties())) {
            Properties properties = new Properties();
            try {
                properties.load(new FileReader(nephronOptions.getKafkaClientProperties()));
                for (Map.Entry entry : properties.entrySet()) {
                    hashMap.put(entry.getKey().toString(), entry.getValue());
                }
            } catch (IOException e) {
                LOG.error("Error loading properties file", (Throwable) e);
                throw new RuntimeException("Error reading properties file", e);
            }
        }
        return hashMap;
    }

    public static void attachWriteToElastic(NephronOptions nephronOptions, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        if (Strings.isNullOrEmpty(nephronOptions.getElasticUrl())) {
            return;
        }
        pCollection.apply(new WriteToElasticsearch(nephronOptions));
    }

    public static void attachWriteToKafka(NephronOptions nephronOptions, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        if (Strings.isNullOrEmpty(nephronOptions.getFlowDestTopic())) {
            return;
        }
        pCollection.apply(new WriteToKafka(nephronOptions.getBootstrapServers(), nephronOptions.getFlowDestTopic(), loadKafkaClientProperties(nephronOptions)));
    }

    public static void attachWriteToCortex(NephronOptions nephronOptions, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        attachWriteToCortex(nephronOptions, pCollection, write -> {
        });
    }

    public static void attachWriteToCortex(NephronOptions nephronOptions, PCollection<KV<CompoundKey, Aggregate>> pCollection, Consumer<CortexIo.Write<CompoundKey, Aggregate>> consumer) {
        if (cortexOutputEnabled(nephronOptions)) {
            CortexIo.Write<CompoundKey, Aggregate> of = nephronOptions.getCortexAccumulationDelayMs() != 0 ? CortexIo.of(nephronOptions.getCortexWriteUrl(), Pipeline::cortexOutput, new CompoundKey.CompoundKeyCoder(), new Aggregate.AggregateCoder(), Aggregate::merge, org.joda.time.Duration.millis(nephronOptions.getCortexAccumulationDelayMs())) : CortexIo.of(nephronOptions.getCortexWriteUrl(), Pipeline::cortexOutput);
            of.withMaxBatchSize(nephronOptions.getCortexMaxBatchSize()).withMaxBatchBytes(nephronOptions.getCortexMaxBatchBytes());
            consumer.accept(of);
            if (!Strings.isNullOrEmpty(nephronOptions.getCortexOrgId())) {
                of.withOrgId(nephronOptions.getCortexOrgId());
            }
            ((PCollection) pCollection.apply(Filter.by(includeInCortexOutput(nephronOptions)))).apply(of);
        }
    }

    private static IpValue validateIpAddress(String str) throws IllegalArgumentException {
        StringValue stringValue = new StringValue(str);
        String str2 = "invalid cortexConsideredHosts argument - value: " + str;
        for (StringValue stringValue2 : stringValue.splitBy(CsvInputFormat.DEFAULT_FIELD_DELIMITER)) {
            if (stringValue2.isRanged()) {
                List<StringValue> splitBy = stringValue2.splitBy("-");
                if (splitBy.size() != 2) {
                    throw new IllegalArgumentException(str2 + "; at range: " + stringValue2.getValue());
                }
                for (StringValue stringValue3 : splitBy) {
                    if (stringValue3.contains("/")) {
                        throw new IllegalArgumentException(str2 + "; CIDR notation not supported in address ranges: " + stringValue3.getValue());
                    }
                    if (!InetAddresses.isInetAddress(stringValue3.getValue())) {
                        throw new IllegalArgumentException(str2 + "; not an ip address: " + stringValue3.getValue());
                    }
                }
                IPAddress iPAddress = new IPAddress(splitBy.get(0).getValue());
                IPAddress iPAddress2 = new IPAddress(splitBy.get(1).getValue());
                if (iPAddress.isGreaterThan(iPAddress2)) {
                    throw new IllegalArgumentException(str2 + "; invalid address range: begin must not be after end - begin: " + iPAddress + "; end: " + iPAddress2);
                }
            } else if (stringValue2.contains("/")) {
                try {
                    IpValue.parseCIDR(stringValue2.getValue());
                } catch (Exception e) {
                    throw new IllegalArgumentException(str2 + "; not a valid CIDR value: " + stringValue2.getValue());
                }
            } else if (!InetAddresses.isInetAddress(stringValue2.getValue())) {
                throw new IllegalArgumentException(str2 + "; not an ip address: " + stringValue2.getValue());
            }
        }
        return IpValue.of(stringValue);
    }

    private static SerializableFunction<KV<CompoundKey, Aggregate>, Boolean> includeInCortexOutput(NephronOptions nephronOptions) {
        if (!StringUtils.isNoneBlank(nephronOptions.getCortexConsideredHosts())) {
            return kv -> {
                switch (((CompoundKey) kv.getKey()).type) {
                    case EXPORTER_INTERFACE_HOST:
                    case EXPORTER_INTERFACE_TOS_HOST:
                    case EXPORTER_INTERFACE_CONVERSATION:
                    case EXPORTER_INTERFACE_TOS_CONVERSATION:
                        return false;
                    default:
                        return true;
                }
            };
        }
        IpValue validateIpAddress = validateIpAddress(nephronOptions.getCortexConsideredHosts());
        return kv2 -> {
            switch (((CompoundKey) kv2.getKey()).type) {
                case EXPORTER_INTERFACE_HOST:
                case EXPORTER_INTERFACE_TOS_HOST:
                    return Boolean.valueOf(validateIpAddress.isInRange(((CompoundKey) kv2.getKey()).data.address));
                case EXPORTER_INTERFACE_CONVERSATION:
                case EXPORTER_INTERFACE_TOS_CONVERSATION:
                    return false;
                default:
                    return true;
            }
        };
    }

    private static boolean cortexOutputEnabled(NephronOptions nephronOptions) {
        return !Strings.isNullOrEmpty(nephronOptions.getCortexWriteUrl());
    }

    private static void cortexOutput(CompoundKey compoundKey, Aggregate aggregate, Instant instant, int i, TimeSeriesBuilder timeSeriesBuilder) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("cortex output - eventTimestamp: {}; keyType: {}; key: {}; index: {}; in: {}; out: {}; total: {}", instant, compoundKey.type, compoundKey, Integer.valueOf(i), Long.valueOf(aggregate.getBytesIn()), Long.valueOf(aggregate.getBytesOut()), Long.valueOf(aggregate.getBytesIn() + aggregate.getBytesOut()));
        }
        doCortexOutput(compoundKey, instant, i, "in", aggregate.getBytesIn(), timeSeriesBuilder);
        timeSeriesBuilder.nextSeries();
        doCortexOutput(compoundKey, instant, i, "out", aggregate.getBytesOut(), timeSeriesBuilder);
    }

    private static void doCortexOutput(CompoundKey compoundKey, Instant instant, int i, String str, long j, TimeSeriesBuilder timeSeriesBuilder) {
        timeSeriesBuilder.addLabel("pane", i);
        timeSeriesBuilder.addLabel("direction", str);
        timeSeriesBuilder.addSample(instant.getMillis(), j);
        compoundKey.populate(timeSeriesBuilder);
    }

    public static void registerCoders(org.apache.beam.sdk.Pipeline pipeline) {
        CoderRegistry coderRegistry = pipeline.getCoderRegistry();
        coderRegistry.registerCoderForClass(FlowDocument.class, new FlowDocumentProtobufCoder());
        coderRegistry.registerCoderForClass(CompoundKey.class, new CompoundKey.CompoundKeyCoder());
        coderRegistry.registerCoderForClass(Aggregate.class, new Aggregate.AggregateCoder());
    }

    public static TimestampPolicyFactory<byte[], FlowDocument> getKafkaInputTimestampPolicyFactory(org.joda.time.Duration duration) {
        return (topicPartition, optional) -> {
            return new CustomTimestampPolicyWithLimitedDelay(ReadFromKafka::getTimestamp, duration, optional);
        };
    }

    public static ParDo.SingleOutput<FlowDocument, FlowDocument> attachTimestamps(final org.joda.time.Duration duration, final org.joda.time.Duration duration2) {
        return ParDo.of(new DoFn<FlowDocument, FlowDocument>() { // from class: org.opennms.nephron.Pipeline.4
            final long windowSizeMs;
            final long maxFlowDurationMs;

            {
                this.windowSizeMs = org.joda.time.Duration.this.getMillis();
                this.maxFlowDurationMs = duration2.getMillis();
            }

            @DoFn.ProcessElement
            public void processElement(DoFn<FlowDocument, FlowDocument>.ProcessContext processContext) {
                FlowDocument element = processContext.element();
                long value = element.getDeltaSwitched().getValue();
                long value2 = element.getLastSwitched().getValue();
                int nodeId = element.getExporterNode().getNodeId();
                if (value < UnalignedFixedWindows.perNodeShift(nodeId, this.windowSizeMs)) {
                    Pipeline.RATE_LIMITED_LOG.warn("Skipping output for flow whose start is too small w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", Instant.ofEpochMilli(value), Instant.ofEpochMilli(value2), Instant.ofEpochMilli(value), processContext.timestamp(), element);
                    return;
                }
                long windowNumber = (UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, value2) - UnalignedFixedWindows.windowNumber(nodeId, this.windowSizeMs, value)) + 1;
                long j = value;
                long j2 = 0;
                while (true) {
                    long j3 = j2;
                    if (j3 >= windowNumber) {
                        return;
                    }
                    if (j <= processContext.timestamp().getMillis() - this.maxFlowDurationMs) {
                        Pipeline.RATE_LIMITED_LOG.warn("Skipping output for flow that reaches back too far w/ start: {}, end: {}, target timestamp: {}, current input timestamp: {}. Full flow: {}", Instant.ofEpochMilli(value), Instant.ofEpochMilli(value2), Instant.ofEpochMilli(j), processContext.timestamp(), element);
                    } else {
                        processContext.outputWithTimestamp(element, Instant.ofEpochMilli(j));
                    }
                    j = j + this.windowSizeMs < value2 ? j + this.windowSizeMs : value2;
                    j2 = j3 + 1;
                }
            }

            @Override // org.apache.beam.sdk.transforms.DoFn
            public org.joda.time.Duration getAllowedTimestampSkew() {
                return duration2;
            }
        });
    }

    public static FlowSummary toFlowSummary(KV<CompoundKey, Aggregate> kv, IntervalWindow intervalWindow) {
        FlowSummary flowSummary = new FlowSummary();
        kv.getKey().populate(flowSummary);
        flowSummary.setAggregationType(kv.getKey().type.isTotalNotTopK() ? AggregationType.TOTAL : AggregationType.TOPK);
        flowSummary.setRangeStartMs(intervalWindow.start().getMillis());
        flowSummary.setRangeEndMs(intervalWindow.end().getMillis());
        flowSummary.setTimestamp(flowSummary.getRangeEndMs());
        flowSummary.setBytesEgress(Long.valueOf(kv.getValue().getBytesOut()));
        flowSummary.setBytesIngress(Long.valueOf(kv.getValue().getBytesIn()));
        flowSummary.setBytesTotal(Long.valueOf(flowSummary.getBytesIngress().longValue() + flowSummary.getBytesEgress().longValue()));
        flowSummary.setCongestionEncountered(Boolean.valueOf(kv.getValue().isCongestionEncountered()));
        flowSummary.setNonEcnCapableTransport(Boolean.valueOf(kv.getValue().isNonEcnCapableTransport()));
        if (kv.getKey().getType() == CompoundKeyType.EXPORTER_INTERFACE_HOST || kv.getKey().getType() == CompoundKeyType.EXPORTER_INTERFACE_TOS_HOST) {
            flowSummary.setHostName(Strings.emptyToNull(kv.getValue().getHostname()));
        }
        return flowSummary;
    }

    public static Window<FlowDocument> toWindow(org.joda.time.Duration duration, org.joda.time.Duration duration2, org.joda.time.Duration duration3, org.joda.time.Duration duration4) {
        AfterWatermark.AfterWatermarkEarlyAndLate withLateFirings = AfterWatermark.pastEndOfWindow().withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration3));
        if (duration2 != null && !duration2.isEqual(org.joda.time.Duration.ZERO)) {
            withLateFirings = withLateFirings.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(duration2));
        }
        return Window.into(UnalignedFixedWindows.of(duration)).withTimestampCombiner(TimestampCombiner.END_OF_WINDOW).triggering(withLateFirings).withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY).withAllowedLateness(duration4).discardingFiredPanes();
    }

    public static long bytesInWindow(long j, long j2, double d, long j3, long j4) {
        long j5 = (j2 - j) + 1;
        return ((long) ((((Math.min(j2, j4) - j) + 1) * d) / j5)) - ((long) (((((Math.max(j, j3) - 1) - j) + 1) * d) / j5));
    }

    public static Aggregate aggregatize(IntervalWindow intervalWindow, FlowDocument flowDocument, String str, String str2) {
        Aggregate aggregate;
        double d = 1.0d;
        if (flowDocument.hasSamplingInterval()) {
            double value = flowDocument.getSamplingInterval().getValue();
            if (value > CMAESOptimizer.DEFAULT_STOPFITNESS) {
                d = value;
            }
        }
        long bytesInWindow = bytesInWindow(flowDocument.getDeltaSwitched().getValue(), flowDocument.getLastSwitched().getValue(), flowDocument.getNumBytes().getValue() * d, intervalWindow.start().getMillis(), intervalWindow.maxTimestamp().getMillis());
        if (Direction.INGRESS.equals(flowDocument.getDirection())) {
            aggregate = new Aggregate(bytesInWindow, 0L, str, str2, flowDocument.hasEcn() ? Integer.valueOf(flowDocument.getEcn().getValue()) : null);
        } else {
            aggregate = new Aggregate(0L, bytesInWindow, str, str2, flowDocument.hasEcn() ? Integer.valueOf(flowDocument.getEcn().getValue()) : null);
        }
        return aggregate;
    }

    public static TotalAndSummary aggregateParentTotal(String str, PCollection<KV<CompoundKey, Aggregate>> pCollection) {
        PCollection pCollection2 = (PCollection) ((PCollection) pCollection.apply(str + "group_by_outer_key", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.5
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>.ProcessContext processContext) {
                KV<CompoundKey, Aggregate> element = processContext.element();
                processContext.output(KV.of(element.getKey().getOuterKey(), element.getValue()));
            }
        }))).apply(str + "sum_bytes_by_key", Combine.perKey(new SumBytes()));
        return new TotalAndSummary(pCollection2, pCollection2);
    }

    public static SumsAndTopKs aggregateSumsAndTopKs(String str, PCollection<KV<CompoundKey, Aggregate>> pCollection, final CompoundKeyType compoundKeyType, int i, SerializableFunction<CompoundKey, Boolean> serializableFunction) {
        SumAndTopK aggregateSumAndTopK = aggregateSumAndTopK(str + "with_tos_", pCollection, i, serializableFunction);
        return new SumsAndTopKs(aggregateSumAndTopK, aggregateSumAndTopK(str + "without_tos_", (PCollection) aggregateSumAndTopK.sum.apply(str + "group_without_tos_", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.6
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, Aggregate>>.ProcessContext processContext) {
                KV<CompoundKey, Aggregate> element = processContext.element();
                processContext.output(KV.of(element.getKey().cast(CompoundKeyType.this), element.getValue()));
            }
        })), i, serializableFunction));
    }

    public static SumAndTopK aggregateSumAndTopK(String str, PCollection<KV<CompoundKey, Aggregate>> pCollection, int i, final SerializableFunction<CompoundKey, Boolean> serializableFunction) {
        PCollection pCollection2 = (PCollection) pCollection.apply(str + "sum_bytes_by_key", Combine.perKey(new SumBytes()));
        return new SumAndTopK(pCollection2, (PCollection) ((PCollection) ((PCollection) ((PCollection) pCollection2.apply(str + "group_by_outer_key", ParDo.of(new DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>() { // from class: org.opennms.nephron.Pipeline.7
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<CompoundKey, Aggregate>, KV<CompoundKey, KV<CompoundKey, Aggregate>>>.ProcessContext processContext) {
                KV<CompoundKey, Aggregate> element = processContext.element();
                if (((Boolean) SerializableFunction.this.apply(element.getKey())).booleanValue()) {
                    processContext.output(KV.of(element.getKey().getOuterKey(), element));
                }
            }
        }))).apply(str + "top_k_per_key", Top.perKey(i, new FlowBytesValueComparator()))).apply(str + "flatten", Values.create())).apply(str + "top_k_summary", ParDo.of(new DoFn<List<KV<CompoundKey, Aggregate>>, KV<CompoundKey, Aggregate>>() { // from class: org.opennms.nephron.Pipeline.8
            @DoFn.ProcessElement
            public void processElement(DoFn<List<KV<CompoundKey, Aggregate>>, KV<CompoundKey, Aggregate>>.ProcessContext processContext) {
                Iterator<KV<CompoundKey, Aggregate>> it2 = processContext.element().iterator();
                while (it2.hasNext()) {
                    processContext.output(it2.next());
                }
            }
        })));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1097691466:
                if (implMethodName.equals("lambda$includeInCortexOutput$d8b52c54$1")) {
                    z = false;
                    break;
                }
                break;
            case -708075198:
                if (implMethodName.equals("cortexOutput")) {
                    z = 4;
                    break;
                }
                break;
            case -695126420:
                if (implMethodName.equals("lambda$includeInCortexOutput$ca77f6ed$1")) {
                    z = 5;
                    break;
                }
                break;
            case -548544950:
                if (implMethodName.equals("lambda$getKafkaInputTimestampPolicyFactory$84917748$1")) {
                    z = 3;
                    break;
                }
                break;
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = true;
                    break;
                }
                break;
            case 103785528:
                if (implMethodName.equals("merge")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/network/IpValue;Lorg/apache/beam/sdk/values/KV;)Ljava/lang/Boolean;")) {
                    IpValue ipValue = (IpValue) serializedLambda.getCapturedArg(0);
                    return kv2 -> {
                        switch (((CompoundKey) kv2.getKey()).type) {
                            case EXPORTER_INTERFACE_HOST:
                            case EXPORTER_INTERFACE_TOS_HOST:
                                return Boolean.valueOf(ipValue.isInRange(((CompoundKey) kv2.getKey()).data.address));
                            case EXPORTER_INTERFACE_CONVERSATION:
                            case EXPORTER_INTERFACE_TOS_CONVERSATION:
                                return false;
                            default:
                                return true;
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline$ReadFromKafka") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/io/kafka/KafkaRecord;)Lorg/joda/time/Instant;")) {
                    return ReadFromKafka::getTimestamp;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Aggregate") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/Aggregate;Lorg/opennms/nephron/Aggregate;)Lorg/opennms/nephron/Aggregate;")) {
                    return Aggregate::merge;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableBiFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Aggregate") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/Aggregate;Lorg/opennms/nephron/Aggregate;)Lorg/opennms/nephron/Aggregate;")) {
                    return Aggregate::merge;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/kafka/TimestampPolicyFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("createTimestampPolicy") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/joda/time/Duration;Lorg/apache/kafka/common/TopicPartition;Ljava/util/Optional;)Lorg/apache/beam/sdk/io/kafka/TimestampPolicy;")) {
                    org.joda.time.Duration duration = (org.joda.time.Duration) serializedLambda.getCapturedArg(0);
                    return (topicPartition, optional) -> {
                        return new CustomTimestampPolicyWithLimitedDelay(ReadFromKafka::getTimestamp, duration, optional);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/opennms/nephron/cortex/CortexIo$BuildTimeSeries") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Lorg/joda/time/Instant;ILorg/opennms/nephron/cortex/TimeSeriesBuilder;)V") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/CompoundKey;Lorg/opennms/nephron/Aggregate;Lorg/joda/time/Instant;ILorg/opennms/nephron/cortex/TimeSeriesBuilder;)V")) {
                    return Pipeline::cortexOutput;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/opennms/nephron/cortex/CortexIo$BuildTimeSeries") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Lorg/joda/time/Instant;ILorg/opennms/nephron/cortex/TimeSeriesBuilder;)V") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/opennms/nephron/CompoundKey;Lorg/opennms/nephron/Aggregate;Lorg/joda/time/Instant;ILorg/opennms/nephron/cortex/TimeSeriesBuilder;)V")) {
                    return Pipeline::cortexOutput;
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/opennms/nephron/Pipeline") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;)Ljava/lang/Boolean;")) {
                    return kv -> {
                        switch (((CompoundKey) kv.getKey()).type) {
                            case EXPORTER_INTERFACE_HOST:
                            case EXPORTER_INTERFACE_TOS_HOST:
                            case EXPORTER_INTERFACE_CONVERSATION:
                            case EXPORTER_INTERFACE_TOS_CONVERSATION:
                                return false;
                            default:
                                return true;
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
