package com.hazelcast.jet.impl.processor;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.util.Preconditions;
import com.hazelcast.internal.util.collection.Long2ObjectHashMap;
import com.hazelcast.internal.util.counters.Counter;
import com.hazelcast.internal.util.counters.SwCounter;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperation;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.BroadcastKey;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.SlidingWindowPolicy;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.function.KeyedWindowResultFunction;
import com.hazelcast.jet.impl.execution.init.JetInitDataSerializerHook;
import com.hazelcast.jet.impl.util.LoggingUtil;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.partition.PartitionAware;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.ToLongFunction;
import java.util.stream.LongStream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.1.jar:com/hazelcast/jet/impl/processor/SlidingWindowP.class */
public class SlidingWindowP<K, A, R, OUT> extends AbstractProcessor {
    Map<K, A> slidingWindow;
    Map<K, A> slidingWindowBackup;

    @Nonnull
    private final SlidingWindowPolicy winPolicy;

    @Nonnull
    private final List<ToLongFunction<Object>> frameTimestampFns;

    @Nonnull
    private final List<Function<Object, ? extends K>> keyFns;

    @Nonnull
    private final AggregateOperation<A, ? extends R> aggrOp;

    @Nonnull
    private final A emptyAcc;

    @Nonnull
    private final KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> mapToOutputFn;

    @Nullable
    private final BiConsumer<? super A, ? super A> combineFn;
    private final boolean isLastStage;

    @Nonnull
    private final AbstractProcessor.FlatMapper<Watermark, ?> wmFlatMapper;
    private ProcessingGuarantee processingGuarantee;
    private final LongFunction<Map<K, A>> createMapPerTsFunction;
    private final Function<K, A> createAccFunction;
    private final long earlyResultsPeriod;
    private long lastTimeEarlyResultsEmitted;
    private Traverser<? extends OUT> earlyWinTraverser;
    private Traverser<Object> flushTraverser;
    private Traverser<Map.Entry> snapshotTraverser;
    private boolean badFrameRestored;
    static final /* synthetic */ boolean $assertionsDisabled;
    final Long2ObjectHashMap<Map<K, A>> tsToKeyToAcc = new Long2ObjectHashMap<>();
    long nextWinToEmit = Long.MIN_VALUE;

    @Probe(name = "lateEventsDropped")
    private final Counter lateEventsDropped = SwCounter.newSwCounter();

    @Probe(name = "totalFrames")
    private final Counter totalFrames = SwCounter.newSwCounter();

    @Probe(name = "totalKeysInFrames")
    private final Counter totalKeysInFrames = SwCounter.newSwCounter();
    private long topTs = Long.MIN_VALUE;
    private long minRestoredNextWinToEmit = Long.MAX_VALUE;
    private long minRestoredFrameTs = Long.MAX_VALUE;

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.1.jar:com/hazelcast/jet/impl/processor/SlidingWindowP$Keys.class */
    enum Keys {
        NEXT_WIN_TO_EMIT
    }

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.1.1.jar:com/hazelcast/jet/impl/processor/SlidingWindowP$SnapshotKey.class */
    public static final class SnapshotKey implements PartitionAware<Object>, IdentifiedDataSerializable {
        long timestamp;
        Object key;

        public SnapshotKey() {
        }

        SnapshotKey(long j, @Nonnull Object obj) {
            this.timestamp = j;
            this.key = obj;
        }

        @Override // com.hazelcast.partition.PartitionAware
        public Object getPartitionKey() {
            return this.key;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getFactoryId() {
            return JetInitDataSerializerHook.FACTORY_ID;
        }

        @Override // com.hazelcast.nio.serialization.IdentifiedDataSerializable
        public int getClassId() {
            return 13;
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void writeData(ObjectDataOutput objectDataOutput) throws IOException {
            objectDataOutput.writeLong(this.timestamp);
            objectDataOutput.writeObject(this.key);
        }

        @Override // com.hazelcast.nio.serialization.DataSerializable
        public void readData(ObjectDataInput objectDataInput) throws IOException {
            this.timestamp = objectDataInput.readLong();
            this.key = objectDataInput.readObject();
        }

        public boolean equals(Object obj) {
            if (this != obj) {
                if (obj instanceof SnapshotKey) {
                    SnapshotKey snapshotKey = (SnapshotKey) obj;
                    if (this.timestamp != snapshotKey.timestamp || !Objects.equals(this.key, snapshotKey.key)) {
                    }
                }
                return false;
            }
            return true;
        }

        public int hashCode() {
            return (73 * ((int) (this.timestamp ^ (this.timestamp >>> 32)))) + Objects.hashCode(this.key);
        }

        public String toString() {
            return "SnapshotKey{timestamp=" + this.timestamp + ", key=" + this.key + '}';
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public SlidingWindowP(@Nonnull List<? extends Function<?, ? extends K>> list, @Nonnull List<? extends ToLongFunction<?>> list2, @Nonnull SlidingWindowPolicy slidingWindowPolicy, long j, @Nonnull AggregateOperation<A, ? extends R> aggregateOperation, @Nonnull KeyedWindowResultFunction<? super K, ? super R, ? extends OUT> keyedWindowResultFunction, boolean z) {
        Preconditions.checkTrue(list.size() == aggregateOperation.arity(), list.size() + " key functions provided for " + aggregateOperation.arity() + "-arity aggregate operation");
        if (!slidingWindowPolicy.isTumbling()) {
            Objects.requireNonNull(aggregateOperation.combineFn(), "AggregateOperation.combineFn is required for sliding windows");
        }
        Preconditions.checkNotNegative(j, "earlyResultsPeriod must be zero or positive");
        this.winPolicy = slidingWindowPolicy;
        this.frameTimestampFns = list2;
        this.keyFns = list;
        this.earlyResultsPeriod = j;
        this.aggrOp = aggregateOperation;
        this.combineFn = aggregateOperation.combineFn();
        this.mapToOutputFn = keyedWindowResultFunction;
        this.isLastStage = z;
        this.wmFlatMapper = flatMapper(watermark -> {
            return windowTraverserAndEvictor(watermark.timestamp()).append(watermark).onFirstNull(() -> {
                this.nextWinToEmit = slidingWindowPolicy.higherFrameTs(watermark.timestamp());
            });
        });
        this.emptyAcc = aggregateOperation.createFn().get();
        this.createMapPerTsFunction = j2 -> {
            this.totalFrames.inc();
            return new HashMap();
        };
        this.createAccFunction = obj -> {
            this.totalKeysInFrames.inc();
            return aggregateOperation.createFn().get();
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    public void init(@Nonnull Processor.Context context) {
        this.processingGuarantee = context.processingGuarantee();
        this.lastTimeEarlyResultsEmitted = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean tryProcess() {
        if (this.earlyResultsPeriod == 0 || this.topTs == Long.MIN_VALUE) {
            return true;
        }
        if (this.earlyWinTraverser != null) {
            return emitFromTraverser(this.earlyWinTraverser);
        }
        long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
        if (millis < this.lastTimeEarlyResultsEmitted + this.earlyResultsPeriod) {
            return true;
        }
        long startingWindowTs = startingWindowTs(Long.MAX_VALUE);
        if (startingWindowTs == Long.MIN_VALUE) {
            return true;
        }
        this.lastTimeEarlyResultsEmitted = millis;
        this.slidingWindowBackup = this.slidingWindow;
        this.slidingWindow = null;
        this.earlyWinTraverser = Traversers.traverseStream(range(startingWindowTs, (this.topTs + this.winPolicy.windowSize()) - this.winPolicy.frameSize(), this.winPolicy.frameSize()).boxed()).flatMap(l -> {
            return Traversers.traverseIterable(computeWindow(l.longValue()).entrySet()).map(entry -> {
                return this.mapToOutputFn.apply(l.longValue() - this.winPolicy.windowSize(), l.longValue(), (Object) entry.getKey(), this.aggrOp.exportFn().apply((Object) entry.getValue()), true);
            }).onFirstNull(() -> {
                completeEarlyWindow(l.longValue());
            });
        }).onFirstNull(() -> {
            this.slidingWindow = this.slidingWindowBackup;
            this.slidingWindowBackup = null;
            this.earlyWinTraverser = null;
        });
        return emitFromTraverser(this.earlyWinTraverser);
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected boolean tryProcess(int i, @Nonnull Object obj) {
        long applyAsLong = this.frameTimestampFns.get(i).applyAsLong(obj);
        if (!$assertionsDisabled && applyAsLong != this.winPolicy.floorFrameTs(applyAsLong)) {
            throw new AssertionError("getFrameTsFn returned an invalid frame timestamp");
        }
        if (applyAsLong < this.nextWinToEmit) {
            Util.logLateEvent(getLogger(), this.nextWinToEmit, obj);
            this.lateEventsDropped.inc();
            return true;
        }
        this.aggrOp.accumulateFn(i).accept(this.tsToKeyToAcc.computeIfAbsent(applyAsLong, this.createMapPerTsFunction).computeIfAbsent(this.keyFns.get(i).apply(obj), this.createAccFunction), obj);
        this.topTs = Math.max(this.topTs, applyAsLong);
        return true;
    }

    @Override // com.hazelcast.jet.core.AbstractProcessor, com.hazelcast.jet.core.Processor
    public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
        return this.wmFlatMapper.tryProcess(watermark);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean complete() {
        return flushBuffers();
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean saveToSnapshot() {
        if (!this.isLastStage || this.flushTraverser != null) {
            return flushBuffers();
        }
        if (this.snapshotTraverser == null) {
            this.snapshotTraverser = Traversers.traverseIterable(this.tsToKeyToAcc.entrySet()).flatMap(entry -> {
                return Traversers.traverseIterable(((Map) entry.getValue()).entrySet()).map(entry -> {
                    return com.hazelcast.jet.Util.entry(new SnapshotKey(((Long) entry.getKey()).longValue(), entry.getKey()), entry.getValue());
                });
            }).append(com.hazelcast.jet.Util.entry(BroadcastKey.broadcastKey(Keys.NEXT_WIN_TO_EMIT), Long.valueOf(this.nextWinToEmit))).onFirstNull(() -> {
                LoggingUtil.logFinest(getLogger(), "Saved nextWinToEmit: %s", Long.valueOf(this.nextWinToEmit));
                this.snapshotTraverser = null;
            });
        }
        return emitFromTraverserToSnapshot(this.snapshotTraverser);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.hazelcast.jet.core.AbstractProcessor
    protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
        if (obj instanceof BroadcastKey) {
            BroadcastKey broadcastKey = (BroadcastKey) obj;
            if (!Keys.NEXT_WIN_TO_EMIT.equals(broadcastKey.key())) {
                throw new JetException("Unexpected broadcast key: " + broadcastKey.key());
            }
            long longValue = ((Long) obj2).longValue();
            if (!$assertionsDisabled && this.processingGuarantee == ProcessingGuarantee.EXACTLY_ONCE && this.minRestoredNextWinToEmit != Long.MAX_VALUE && this.minRestoredNextWinToEmit != longValue) {
                throw new AssertionError("different values for nextWinToEmit restored, before=" + this.minRestoredNextWinToEmit + ", new=" + longValue);
            }
            this.minRestoredNextWinToEmit = Math.min(longValue, this.minRestoredNextWinToEmit);
            return;
        }
        SnapshotKey snapshotKey = (SnapshotKey) obj;
        long higherFrameTs = this.winPolicy.higherFrameTs(snapshotKey.timestamp - 1);
        if (higherFrameTs != snapshotKey.timestamp && !this.badFrameRestored) {
            this.badFrameRestored = true;
            getLogger().warning("Frames in the state do not match the current frame size: they were likely saved for a different window slide step or a different offset. The window results will probably be incorrect until all restored frames are emitted.");
        }
        this.minRestoredFrameTs = Math.min(higherFrameTs, this.minRestoredFrameTs);
        this.tsToKeyToAcc.computeIfAbsent(higherFrameTs, this.createMapPerTsFunction).merge(snapshotKey.key, obj2, (obj3, obj4) -> {
            if (!this.badFrameRestored) {
                throw new JetException("Duplicate key in snapshot: " + snapshotKey);
            }
            if (this.combineFn == null) {
                throw new JetException("AggregateOperation.combineFn required for merging restored frames");
            }
            this.combineFn.accept(obj3, obj4);
            this.totalKeysInFrames.inc(-1L);
            return obj3;
        });
        this.totalKeysInFrames.inc();
        this.topTs = Math.max(this.topTs, higherFrameTs);
    }

    @Override // com.hazelcast.jet.core.Processor
    public boolean finishSnapshotRestore() {
        if (!this.isLastStage) {
            return true;
        }
        this.nextWinToEmit = this.minRestoredNextWinToEmit > Long.MIN_VALUE ? this.winPolicy.higherFrameTs(this.minRestoredNextWinToEmit - 1) : this.minRestoredNextWinToEmit;
        LoggingUtil.logFine(getLogger(), "Restored nextWinToEmit from snapshot to: %s", Long.valueOf(this.nextWinToEmit));
        if (this.nextWinToEmit <= Long.MIN_VALUE + this.winPolicy.windowSize()) {
            return true;
        }
        long j = this.minRestoredFrameTs;
        while (true) {
            long j2 = j;
            if (j2 > this.nextWinToEmit - this.winPolicy.windowSize()) {
                return true;
            }
            if (this.tsToKeyToAcc.remove(j2) != null) {
                this.totalFrames.inc(-1L);
                this.totalKeysInFrames.inc(-r0.size());
            }
            j = j2 + this.winPolicy.frameSize();
        }
    }

    private Traverser<Object> windowTraverserAndEvictor(long j) {
        long startingWindowTs = startingWindowTs(j);
        return startingWindowTs == Long.MIN_VALUE ? Traversers.empty() : Traversers.traverseStream(range(startingWindowTs, j, this.winPolicy.frameSize()).boxed()).flatMap(l -> {
            return Traversers.traverseIterable(computeWindow(l.longValue()).entrySet()).map(entry -> {
                return this.mapToOutputFn.apply(l.longValue() - this.winPolicy.windowSize(), l.longValue(), (Object) entry.getKey(), this.aggrOp.finishFn().apply((Object) entry.getValue()), false);
            }).onFirstNull(() -> {
                completeWindow(l.longValue());
            });
        });
    }

    /* JADX WARN: Type inference failed for: r0v8, types: [com.hazelcast.internal.util.collection.Long2ObjectHashMap$KeySet] */
    private long startingWindowTs(long j) {
        if (this.nextWinToEmit != Long.MIN_VALUE) {
            return this.nextWinToEmit;
        }
        if (this.tsToKeyToAcc.isEmpty()) {
            return Long.MIN_VALUE;
        }
        return Math.min(((Long) this.tsToKeyToAcc.keySet2().stream().min(ComparatorEx.naturalOrder()).orElseThrow(() -> {
            return new AssertionError("Failed to find the min key in a non-empty map");
        })).longValue(), this.winPolicy.floorFrameTs(j));
    }

    private Map<K, A> computeWindow(long j) {
        if (this.winPolicy.isTumbling()) {
            return this.tsToKeyToAcc.getOrDefault(Long.valueOf(j), Collections.emptyMap());
        }
        if (this.aggrOp.deductFn() == null) {
            return recomputeWindow(j);
        }
        if (this.slidingWindow == null) {
            this.slidingWindow = recomputeWindow(j);
        } else {
            patchSlidingWindow(this.aggrOp.combineFn(), this.tsToKeyToAcc.get(j));
        }
        return this.slidingWindow;
    }

    private Map<K, A> recomputeWindow(long j) {
        HashMap hashMap = new HashMap();
        long windowSize = j - this.winPolicy.windowSize();
        long frameSize = this.winPolicy.frameSize();
        while (true) {
            long j2 = windowSize + frameSize;
            if (j2 > j) {
                return hashMap;
            }
            if (!$assertionsDisabled && this.combineFn == null) {
                throw new AssertionError("combineFn == null");
            }
            for (Map.Entry<K, A> entry : this.tsToKeyToAcc.getOrDefault(Long.valueOf(j2), Collections.emptyMap()).entrySet()) {
                this.combineFn.accept((Object) hashMap.computeIfAbsent(entry.getKey(), obj -> {
                    return this.aggrOp.createFn().get();
                }), entry.getValue());
            }
            windowSize = j2;
            frameSize = this.winPolicy.frameSize();
        }
    }

    private void patchSlidingWindow(BiConsumer<? super A, ? super A> biConsumer, Map<K, A> map) {
        if (map == null) {
            return;
        }
        for (Map.Entry<K, A> entry : map.entrySet()) {
            this.slidingWindow.compute(entry.getKey(), (obj, obj2) -> {
                Object obj = obj2 != null ? obj2 : this.aggrOp.createFn().get();
                biConsumer.accept(obj, entry.getValue());
                if (obj.equals(this.emptyAcc)) {
                    return null;
                }
                return obj;
            });
        }
    }

    private void completeWindow(long j) {
        Map<K, A> remove = this.tsToKeyToAcc.remove((j - this.winPolicy.windowSize()) + this.winPolicy.frameSize());
        if (remove != null) {
            this.totalKeysInFrames.inc(-remove.size());
            this.totalFrames.inc(-1L);
            if (!this.winPolicy.isTumbling() && this.aggrOp.deductFn() != null) {
                patchSlidingWindow(this.aggrOp.deductFn(), remove);
            }
        }
        if (!$assertionsDisabled && this.tsToKeyToAcc.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum() != this.totalKeysInFrames.get()) {
            throw new AssertionError("totalKeysInFrames mismatch, expected=" + this.tsToKeyToAcc.values().stream().mapToInt((v0) -> {
                return v0.size();
            }).sum() + ", actual=" + this.totalKeysInFrames.get());
        }
    }

    private void completeEarlyWindow(long j) {
        Map<K, A> map;
        if (this.winPolicy.isTumbling() || this.aggrOp.deductFn() == null || (map = this.tsToKeyToAcc.get((j - this.winPolicy.windowSize()) + this.winPolicy.frameSize())) == null) {
            return;
        }
        patchSlidingWindow(this.aggrOp.deductFn(), map);
    }

    private boolean flushBuffers() {
        if (this.flushTraverser == null) {
            if (this.tsToKeyToAcc.isEmpty()) {
                return true;
            }
            this.flushTraverser = windowTraverserAndEvictor((this.topTs + this.winPolicy.windowSize()) - this.winPolicy.frameSize()).onFirstNull(() -> {
                this.flushTraverser = null;
            });
        }
        return emitFromTraverser(this.flushTraverser);
    }

    private static LongStream range(long j, long j2, long j3) {
        return j > j2 ? LongStream.empty() : LongStream.iterate(j, j4 -> {
            return j4 + j3;
        }).limit(1 + ((j2 - j) / j3));
    }

    static {
        $assertionsDisabled = !SlidingWindowP.class.desiredAssertionStatus();
    }
}
