package org.neo4j.fabric.stream;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.fabric.config.FabricConfig;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;

/* loaded from: input_file:org/neo4j/fabric/stream/Prefetcher.class */
public class Prefetcher {
    private static final RecordOrError END = new RecordOrError(null, null);
    private final FabricConfig.DataStream streamConfig;
    private final List<PrefetchOperator> prefetchOperators = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/fabric/stream/Prefetcher$PrefetchOperator.class */
    public class PrefetchOperator extends FluxOperator<Record, Record> {
        private final Queue<RecordOrError> buffer;
        private final RecordSubscriber upstreamSubscriber;
        private final AtomicBoolean producing;
        private final AtomicLong pendingRequested;
        private volatile int bufferLowWatermark;
        private volatile int bufferHighWatermark;
        private volatile boolean finished;
        private volatile Subscriber<Record> downstreamSubscriber;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/neo4j/fabric/stream/Prefetcher$PrefetchOperator$RecordSubscriber.class */
        public class RecordSubscriber implements Subscriber<Record> {
            private volatile Subscription subscription;
            private final AtomicLong pendingRequested = new AtomicLong(0);

            private RecordSubscriber() {
            }

            public void onSubscribe(Subscription subscription) {
                this.subscription = subscription;
                PrefetchOperator.this.maybeRequest();
            }

            public void onNext(Record record) {
                this.pendingRequested.decrementAndGet();
                enqueue(new RecordOrError(record, null));
            }

            public void onError(Throwable th) {
                enqueue(new RecordOrError(null, th));
            }

            public void onComplete() {
                enqueue(Prefetcher.END);
            }

            void request(long j) {
                this.pendingRequested.addAndGet(j);
                this.subscription.request(j);
            }

            private void enqueue(RecordOrError recordOrError) {
                PrefetchOperator.this.buffer.add(recordOrError);
                PrefetchOperator.this.maybeProduce();
            }

            void close() {
                this.subscription.cancel();
            }
        }

        PrefetchOperator(Flux<Record> flux, int i, int i2) {
            super(flux);
            this.producing = new AtomicBoolean(false);
            this.pendingRequested = new AtomicLong(0L);
            this.bufferHighWatermark = i2;
            this.bufferLowWatermark = i;
            this.buffer = new ArrayBlockingQueue(Prefetcher.this.streamConfig.getBufferSize() + 1);
            this.upstreamSubscriber = new RecordSubscriber();
            flux.subscribeWith(this.upstreamSubscriber);
        }

        private void maybeRequest() {
            int size = this.buffer.size();
            long j = this.upstreamSubscriber.pendingRequested.get();
            long j2 = (this.bufferHighWatermark - size) - j;
            if (size + j > this.bufferLowWatermark || j2 == 0) {
                return;
            }
            this.upstreamSubscriber.request(j2);
        }

        public void subscribe(CoreSubscriber coreSubscriber) {
            this.downstreamSubscriber = coreSubscriber;
            coreSubscriber.onSubscribe(new Subscription() { // from class: org.neo4j.fabric.stream.Prefetcher.PrefetchOperator.1
                public void request(long j) {
                    PrefetchOperator.this.pendingRequested.addAndGet(j);
                    PrefetchOperator.this.maybeProduce();
                }

                public void cancel() {
                    PrefetchOperator.this.finish();
                    PrefetchOperator.this.upstreamSubscriber.close();
                }
            });
        }

        private void maybeProduce() {
            RecordOrError poll;
            if (this.buffer.peek() == null || this.downstreamSubscriber == null || this.pendingRequested.get() == 0 || this.finished || !this.producing.compareAndSet(false, true)) {
                return;
            }
            while (true) {
                if (this.finished || this.pendingRequested.get() <= 0 || (poll = this.buffer.poll()) == null) {
                    break;
                }
                if (poll == Prefetcher.END) {
                    this.downstreamSubscriber.onComplete();
                    finish();
                    break;
                } else if (poll.error != null) {
                    this.downstreamSubscriber.onError(poll.error);
                    finish();
                    break;
                } else {
                    this.pendingRequested.decrementAndGet();
                    this.downstreamSubscriber.onNext(poll.record);
                }
            }
            maybeRequest();
            this.producing.set(false);
            maybeProduce();
        }

        private void finish() {
            this.finished = true;
            Prefetcher.this.removeOperator(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/neo4j/fabric/stream/Prefetcher$RecordOrError.class */
    public static final class RecordOrError extends java.lang.Record {
        private final Record record;
        private final Throwable error;

        private RecordOrError(Record record, Throwable th) {
            this.record = record;
            this.error = th;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, RecordOrError.class), RecordOrError.class, "record;error", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->record:Lorg/neo4j/fabric/stream/Record;", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->error:Ljava/lang/Throwable;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, RecordOrError.class), RecordOrError.class, "record;error", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->record:Lorg/neo4j/fabric/stream/Record;", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->error:Ljava/lang/Throwable;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, RecordOrError.class, Object.class), RecordOrError.class, "record;error", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->record:Lorg/neo4j/fabric/stream/Record;", "FIELD:Lorg/neo4j/fabric/stream/Prefetcher$RecordOrError;->error:Ljava/lang/Throwable;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public Record record() {
            return this.record;
        }

        public Throwable error() {
            return this.error;
        }
    }

    public Prefetcher(FabricConfig.DataStream dataStream) {
        this.streamConfig = dataStream;
    }

    public synchronized Flux<Record> addPrefetch(Flux<Record> flux) {
        int size = this.prefetchOperators.size() + 1;
        int computeLowWatermark = computeLowWatermark(size);
        int computeHighWatermark = computeHighWatermark(size);
        updateWatermarks(computeLowWatermark, computeHighWatermark);
        PrefetchOperator prefetchOperator = new PrefetchOperator(flux, computeLowWatermark, computeHighWatermark);
        this.prefetchOperators.add(prefetchOperator);
        return prefetchOperator;
    }

    private int computeHighWatermark(int i) {
        return Math.max(1, this.streamConfig.getBufferSize() / i);
    }

    private int computeLowWatermark(int i) {
        return this.streamConfig.getBufferLowWatermark() / i;
    }

    private void updateWatermarks(int i, int i2) {
        this.prefetchOperators.forEach(prefetchOperator -> {
            prefetchOperator.bufferLowWatermark = i;
            prefetchOperator.bufferHighWatermark = i2;
        });
    }

    private synchronized void removeOperator(PrefetchOperator prefetchOperator) {
        this.prefetchOperators.remove(prefetchOperator);
        if (this.prefetchOperators.isEmpty()) {
            return;
        }
        int size = this.prefetchOperators.size();
        updateWatermarks(computeLowWatermark(size), computeHighWatermark(size));
    }
}
