package org.opennms.nephron.util;

import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opennms/nephron/util/PaneAccumulator.class */
public class PaneAccumulator<K, V> extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) PaneAccumulator.class);
    private final SerializableBiFunction<V, V, V> combiner;
    private final Duration accumulationDelay;
    private final Coder<K> keyCoder;
    private final Coder<V> valueCoder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opennms/nephron/util/PaneAccumulator$PaneCombinerFn.class */
    public class PaneCombinerFn extends DoFn<KV<K, V>, KV<K, V>> {
        private static final String VALUE_STATE_NAME = "value";
        private static final String OUTPUT_TIMER_NAME = "output";

        @DoFn.StateId("value")
        private final StateSpec<ValueState<V>> valueStateSpec;

        @DoFn.TimerId(OUTPUT_TIMER_NAME)
        private final TimerSpec outputTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        private PaneCombinerFn() {
            this.valueStateSpec = StateSpecs.value(PaneAccumulator.this.valueCoder);
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<K, V>, KV<K, V>>.ProcessContext processContext, BoundedWindow boundedWindow, @DoFn.StateId("value") @DoFn.AlwaysFetched ValueState<V> valueState, @DoFn.TimerId("output") Timer timer) {
            V apply;
            V read = valueState.read();
            if (read == null) {
                if (PaneAccumulator.LOG.isTraceEnabled()) {
                    PaneAccumulator.LOG.trace("create state - key: {}; ctx.timestamp: {}; window.maxTimestamp: {}", processContext.element().getKey(), processContext.timestamp(), boundedWindow.maxTimestamp());
                }
                apply = processContext.element().getValue();
                timer.withOutputTimestamp(boundedWindow.maxTimestamp()).offset(PaneAccumulator.this.accumulationDelay).setRelative();
            } else {
                apply = PaneAccumulator.this.combiner.apply(read, processContext.element().getValue());
            }
            if (PaneAccumulator.LOG.isTraceEnabled()) {
                PaneAccumulator.LOG.trace("write state - key: {}; ctx.timestamp: {}; window.maxTimestamp: {}; newValue: {}", processContext.element().getKey(), processContext.timestamp(), boundedWindow.maxTimestamp(), apply);
            }
            valueState.write(apply);
        }

        @DoFn.OnTimer(OUTPUT_TIMER_NAME)
        public void onOutput(DoFn<KV<K, V>, KV<K, V>>.OnTimerContext onTimerContext, BoundedWindow boundedWindow, @DoFn.Key K k, @DoFn.StateId("value") @DoFn.AlwaysFetched ValueState<V> valueState) {
            V read = valueState.read();
            if (PaneAccumulator.LOG.isTraceEnabled()) {
                PaneAccumulator.LOG.trace("output state - key: {}; ctx.timestamp: {}, window.maxTimestamp: {}", k, onTimerContext.timestamp(), boundedWindow.maxTimestamp());
            }
            onTimerContext.outputWithTimestamp(KV.of(k, read), boundedWindow.maxTimestamp());
            valueState.clear();
        }

        @DoFn.OnWindowExpiration
        public void onExpiration(DoFn.OutputReceiver<KV<K, V>> outputReceiver, BoundedWindow boundedWindow, @DoFn.Key K k, @DoFn.StateId("value") @DoFn.AlwaysFetched ValueState<V> valueState) {
            V read = valueState.read();
            if (read != null) {
                if (PaneAccumulator.LOG.isTraceEnabled()) {
                    PaneAccumulator.LOG.trace("output expired state - key: {}; window.maxTimestamp: {}", k, boundedWindow.maxTimestamp());
                }
                outputReceiver.outputWithTimestamp(KV.of(k, read), boundedWindow.maxTimestamp());
            }
        }
    }

    public PaneAccumulator(SerializableBiFunction<V, V, V> serializableBiFunction, Duration duration, Coder<K> coder, Coder<V> coder2) {
        super("paneAcc");
        this.combiner = serializableBiFunction;
        this.accumulationDelay = duration;
        this.keyCoder = coder;
        this.valueCoder = coder2;
    }

    @Override // org.apache.beam.sdk.transforms.PTransform
    public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> pCollection) {
        return ((PCollection) pCollection.apply(ParDo.of(new PaneCombinerFn()))).setCoder(KvCoder.of(this.keyCoder, this.valueCoder));
    }
}
