package com.hazelcast.jet.impl.execution;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.Pipe;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.util.PrefixedLogger;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.jet.impl.util.ProgressTracker;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Comparator;
import java.util.List;
import java.util.function.Predicate;
import java.util.function.ToIntFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream.class */
public final class ConcurrentInboundEdgeStream {

    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$InboundEdgeStreamBase.class */
    private static abstract class InboundEdgeStreamBase implements InboundEdgeStream {
        final ProgressTracker tracker;
        final ConcurrentConveyor<Object> conveyor;
        final int ordinal;
        final int priority;
        final ILogger logger;

        private InboundEdgeStreamBase(@Nonnull ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, @Nonnull String str) {
            this.tracker = new ProgressTracker();
            this.conveyor = concurrentConveyor;
            this.ordinal = i;
            this.priority = i2;
            this.logger = PrefixedLogger.prefixedLogger(Logger.getLogger(getClass()), str);
            this.logger.finest("Coalescing " + concurrentConveyor.queueCount() + " input queues");
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public int ordinal() {
            return this.ordinal;
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public int priority() {
            return this.priority;
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public boolean isDone() {
            return this.conveyor.liveQueueCount() == 0;
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public int sizes() {
            return conveyorSum((v0) -> {
                return v0.size();
            });
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public int capacities() {
            return conveyorSum((v0) -> {
                return v0.capacity();
            });
        }

        private int conveyorSum(ToIntFunction<QueuedPipe<Object>> toIntFunction) {
            int i = 0;
            for (int i2 = 0; i2 < this.conveyor.queueCount(); i2++) {
                QueuedPipe<Object> queue = this.conveyor.queue(i2);
                if (queue != null) {
                    i += toIntFunction.applyAsInt(queue);
                }
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$OrderedDrain.class */
    public static final class OrderedDrain extends InboundEdgeStreamBase {
        private final Comparator<Object> comparator;
        private final List<QueuedPipe<Object>> queues;
        private final List<ArrayDeque<Object>> drainedItems;
        private Object lastItem;
        private int lastMinIndex;
        static final /* synthetic */ boolean $assertionsDisabled;

        OrderedDrain(@Nonnull ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, @Nonnull String str, @Nullable ComparatorEx<?> comparatorEx) {
            super(concurrentConveyor, i, i2, str);
            this.comparator = comparatorEx;
            this.drainedItems = new ArrayList(concurrentConveyor.queueCount());
            this.queues = new ArrayList(concurrentConveyor.queueCount());
            for (int i3 = 0; i3 < concurrentConveyor.queueCount(); i3++) {
                QueuedPipe<Object> queue = concurrentConveyor.queue(i3);
                this.drainedItems.add(new ArrayDeque<>(queue.capacity()));
                this.queues.add(queue);
            }
        }

        /* JADX WARN: Code restructure failed: missing block: B:32:0x00f3, code lost:
        
            throw new com.hazelcast.jet.JetException("Unexpected item observed: " + r0);
         */
        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        @javax.annotation.Nonnull
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public com.hazelcast.jet.impl.util.ProgressState drainTo(@javax.annotation.Nonnull java.util.function.Predicate<java.lang.Object> r6) {
            /*
                Method dump skipped, instructions count: 376
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream.OrderedDrain.drainTo(java.util.function.Predicate):com.hazelcast.jet.impl.util.ProgressState");
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public long topObservedWm() {
            return Long.MIN_VALUE;
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public long coalescedWm() {
            return Long.MIN_VALUE;
        }

        @Override // com.hazelcast.jet.impl.execution.ConcurrentInboundEdgeStream.InboundEdgeStreamBase, com.hazelcast.jet.impl.execution.InboundEdgeStream
        public boolean isDone() {
            return this.queues.isEmpty();
        }

        static {
            $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$RoundRobinDrain.class */
    public static final class RoundRobinDrain extends InboundEdgeStreamBase {
        private final ItemDetector itemDetector;
        private final WatermarkCoalescer watermarkCoalescer;
        private final BitSet receivedBarriers;
        private boolean waitForAllBarriers;
        private SnapshotBarrier currentBarrier;
        static final /* synthetic */ boolean $assertionsDisabled;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:BOOT-INF/lib/hazelcast-5.0.jar:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStream$RoundRobinDrain$ItemDetector.class */
        public static final class ItemDetector implements Predicate<Object> {
            Predicate<Object> dest;
            BroadcastItem item;
            static final /* synthetic */ boolean $assertionsDisabled;

            private ItemDetector() {
            }

            void reset(Predicate<Object> predicate) {
                this.dest = predicate;
                this.item = null;
            }

            @Override // java.util.function.Predicate
            public boolean test(Object obj) {
                if (!(obj instanceof Watermark) && !(obj instanceof SnapshotBarrier) && obj != DoneItem.DONE_ITEM) {
                    return this.dest.test(obj);
                }
                if (!$assertionsDisabled && this.item != null) {
                    throw new AssertionError("Received multiple special items without a call to reset(): " + this.item);
                }
                this.item = (BroadcastItem) obj;
                return false;
            }

            static {
                $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
            }
        }

        RoundRobinDrain(@Nonnull ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, @Nonnull String str, boolean z) {
            super(concurrentConveyor, i, i2, str);
            this.itemDetector = new ItemDetector();
            this.waitForAllBarriers = z;
            this.watermarkCoalescer = WatermarkCoalescer.create(concurrentConveyor.queueCount());
            this.receivedBarriers = new BitSet(concurrentConveyor.queueCount());
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        @Nonnull
        public ProgressState drainTo(@Nonnull Predicate<Object> predicate) {
            this.tracker.reset();
            for (int i = 0; i < this.conveyor.queueCount(); i++) {
                QueuedPipe<Object> queue = this.conveyor.queue(i);
                if (queue != null && (!this.waitForAllBarriers || !this.receivedBarriers.get(i))) {
                    ProgressState drainQueue = drainQueue(queue, predicate);
                    this.tracker.mergeWith(drainQueue);
                    if (this.itemDetector.item == DoneItem.DONE_ITEM) {
                        this.conveyor.removeQueue(i);
                        this.receivedBarriers.clear(i);
                        long queueDone = this.watermarkCoalescer.queueDone(i);
                        if (maybeEmitWm(queueDone, predicate)) {
                            if (this.logger.isFinestEnabled()) {
                                this.logger.finest("Queue " + i + " is done, forwarding " + new Watermark(queueDone));
                            }
                            return this.conveyor.liveQueueCount() == 0 ? ProgressState.DONE : ProgressState.MADE_PROGRESS;
                        }
                    } else if (this.itemDetector.item instanceof Watermark) {
                        boolean maybeEmitWm = maybeEmitWm(this.watermarkCoalescer.observeWm(i, ((Watermark) this.itemDetector.item).timestamp()), predicate);
                        if (this.logger.isFinestEnabled()) {
                            this.logger.finest("Received " + this.itemDetector.item + " from queue " + i + (maybeEmitWm ? ", forwarded=" : ", not forwarded") + ", coalescedWm=" + Util.toLocalTime(this.watermarkCoalescer.coalescedWm()) + ", topObservedWm=" + Util.toLocalTime(topObservedWm()));
                        }
                        if (maybeEmitWm) {
                            return ProgressState.MADE_PROGRESS;
                        }
                    } else if (this.itemDetector.item instanceof SnapshotBarrier) {
                        observeBarrier(i, (SnapshotBarrier) this.itemDetector.item);
                    } else if (drainQueue.isMadeProgress()) {
                        this.watermarkCoalescer.observeEvent(i);
                    }
                    int liveQueueCount = this.conveyor.liveQueueCount();
                    if (liveQueueCount == 0) {
                        return this.tracker.toProgressState();
                    }
                    if (this.itemDetector.item != null && this.receivedBarriers.cardinality() == liveQueueCount) {
                        if (!$assertionsDisabled && this.currentBarrier == null) {
                            throw new AssertionError("currentBarrier == null");
                        }
                        boolean test = predicate.test(this.currentBarrier);
                        if (!$assertionsDisabled && !test) {
                            throw new AssertionError("test result expected to be true");
                        }
                        this.currentBarrier = null;
                        this.receivedBarriers.clear();
                        return ProgressState.MADE_PROGRESS;
                    }
                }
            }
            if (maybeEmitWm(this.watermarkCoalescer.checkWmHistory(), predicate)) {
                return ProgressState.MADE_PROGRESS;
            }
            if (this.conveyor.liveQueueCount() > 0) {
                this.tracker.notDone();
            }
            return this.tracker.toProgressState();
        }

        private ProgressState drainQueue(Pipe<Object> pipe, Predicate<Object> predicate) {
            this.itemDetector.reset(predicate);
            int drain = pipe.drain(this.itemDetector);
            this.itemDetector.dest = null;
            return ProgressState.valueOf(drain > 0, this.itemDetector.item == DoneItem.DONE_ITEM);
        }

        private void observeBarrier(int i, SnapshotBarrier snapshotBarrier) {
            if (this.currentBarrier == null) {
                this.currentBarrier = snapshotBarrier;
            } else if (!$assertionsDisabled && !this.currentBarrier.equals(snapshotBarrier)) {
                throw new AssertionError(this.currentBarrier + " != " + snapshotBarrier);
            }
            if (snapshotBarrier.isTerminal()) {
                this.waitForAllBarriers = true;
            }
            this.receivedBarriers.set(i);
        }

        private boolean maybeEmitWm(long j, Predicate<Object> predicate) {
            if (j == Long.MIN_VALUE) {
                return false;
            }
            boolean test = predicate.test(new Watermark(j));
            if ($assertionsDisabled || test) {
                return true;
            }
            throw new AssertionError("test result expected to be true");
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public long topObservedWm() {
            return this.watermarkCoalescer.topObservedWm();
        }

        @Override // com.hazelcast.jet.impl.execution.InboundEdgeStream
        public long coalescedWm() {
            return this.watermarkCoalescer.coalescedWm();
        }

        static {
            $assertionsDisabled = !ConcurrentInboundEdgeStream.class.desiredAssertionStatus();
        }
    }

    private ConcurrentInboundEdgeStream() {
    }

    public static InboundEdgeStream create(@Nonnull ConcurrentConveyor<Object> concurrentConveyor, int i, int i2, boolean z, @Nonnull String str, @Nullable ComparatorEx<?> comparatorEx) {
        return comparatorEx == null ? new RoundRobinDrain(concurrentConveyor, i, i2, str, z) : new OrderedDrain(concurrentConveyor, i, i2, str, comparatorEx);
    }
}
