package org.infinispan.reactive.publisher.impl;

import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.LongConsumer;
import io.reactivex.rxjava3.processors.FlowableProcessor;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.infinispan.commons.reactive.RxJavaInterop;
import org.infinispan.commons.util.IntSet;
import org.infinispan.reactive.publisher.impl.commands.batch.KeyPublisherResponse;
import org.infinispan.reactive.publisher.impl.commands.batch.PublisherResponse;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.reactivestreams.Publisher;

/* loaded from: input_file:META-INF/bundled-dependencies/infinispan-core-12.1.6.Final.jar:org/infinispan/reactive/publisher/impl/InnerPublisherSubscription.class */
public class InnerPublisherSubscription<K, I, R> implements LongConsumer, Action {
    protected static final Log log = LogFactory.getLog(MethodHandles.lookup().lookupClass());
    private final ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> parent;
    private final int batchSize;
    private final Supplier<Map.Entry<Address, IntSet>> supplier;
    private final Map<Address, Set<K>> excludedKeys;
    private final int topologyId;
    private final FlowableProcessor<R> flowableProcessor;
    private final AtomicLong requestedAmount = new AtomicLong();
    private volatile Map.Entry<Address, IntSet> currentTarget;
    private volatile boolean cancelled;
    private volatile boolean alreadyCreated;

    private InnerPublisherSubscription(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2, FlowableProcessor<R> flowableProcessor, Map.Entry<Address, IntSet> entry) {
        this.parent = subscriberHandler;
        this.batchSize = i;
        this.supplier = supplier;
        this.excludedKeys = map;
        this.topologyId = i2;
        this.flowableProcessor = flowableProcessor;
        this.currentTarget = entry;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, I, R> Publisher<R> createPublisher(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2) {
        return createPublisher(subscriberHandler, i, supplier, map, i2, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <K, I, R> Publisher<R> createPublisher(ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler, int i, Supplier<Map.Entry<Address, IntSet>> supplier, Map<Address, Set<K>> map, int i2, Map.Entry<Address, IntSet> entry) {
        UnicastProcessor create = UnicastProcessor.create(i);
        InnerPublisherSubscription innerPublisherSubscription = new InnerPublisherSubscription(subscriberHandler, i, supplier, map, i2, create, entry);
        return create.doOnLifecycle(RxJavaInterop.emptyConsumer(), innerPublisherSubscription, innerPublisherSubscription);
    }

    @Override // io.reactivex.rxjava3.functions.Action
    public void run() {
        Map.Entry<Address, IntSet> entry;
        this.cancelled = true;
        if (!this.alreadyCreated || (entry = this.currentTarget) == null) {
            return;
        }
        this.parent.sendCancelCommand(entry.getKey());
    }

    @Override // io.reactivex.rxjava3.functions.LongConsumer
    public void accept(long j) {
        CompletionStage<PublisherResponse> sendInitialCommand;
        if (!shouldSubmit(j) || checkCancelled()) {
            return;
        }
        Map.Entry<Address, IntSet> entry = this.currentTarget;
        if (entry == null) {
            this.alreadyCreated = false;
            entry = this.supplier.get();
            if (entry == null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Completing processor %s", this.flowableProcessor);
                }
                this.flowableProcessor.onComplete();
                return;
            }
            this.currentTarget = entry;
        }
        Address key = entry.getKey();
        IntSet value = entry.getValue();
        if (this.alreadyCreated) {
            sendInitialCommand = this.parent.sendNextCommand(key, this.topologyId);
        } else {
            this.alreadyCreated = true;
            sendInitialCommand = this.parent.sendInitialCommand(key, value, this.batchSize, this.excludedKeys.remove(key), this.topologyId);
        }
        sendInitialCommand.whenComplete((publisherResponse, th) -> {
            Object obj;
            KeyPublisherResponse keyPublisherResponse;
            int extraSize;
            if (th != null) {
                handleThrowableInResponse(th, key, value);
                return;
            }
            try {
                if (log.isTraceEnabled()) {
                    log.tracef("Received %s for id %s from %s", publisherResponse, this.parent.requestId, key);
                }
                IntSet completedSegments = publisherResponse.getCompletedSegments();
                if (completedSegments != null) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Completed segments %s for id %s from %s", completedSegments, this.parent.requestId, key);
                    }
                    ClusterPublisherManagerImpl<K, ?>.SubscriberHandler<I, R> subscriberHandler = this.parent;
                    Objects.requireNonNull(subscriberHandler);
                    completedSegments.forEach(subscriberHandler::completeSegment);
                    Objects.requireNonNull(value);
                    completedSegments.forEach(value::remove);
                }
                IntSet lostSegments = publisherResponse.getLostSegments();
                if (lostSegments != null) {
                    if (log.isTraceEnabled()) {
                        log.tracef("Lost segments %s for id %s from %s", completedSegments, this.parent.requestId, key);
                    }
                    Objects.requireNonNull(value);
                    lostSegments.forEach(value::remove);
                }
                if (publisherResponse.isComplete()) {
                    this.currentTarget = null;
                } else {
                    publisherResponse.forEachSegmentValue(this.parent, value.iterator().nextInt());
                }
                long j2 = 0;
                Object obj2 = null;
                Object[] results = publisherResponse.getResults();
                if ((publisherResponse instanceof KeyPublisherResponse) && (extraSize = (keyPublisherResponse = (KeyPublisherResponse) publisherResponse).getExtraSize()) > 0) {
                    int length = results.length;
                    Object[] objArr = new Object[length + extraSize];
                    System.arraycopy(results, 0, objArr, 0, length);
                    System.arraycopy(keyPublisherResponse.getExtraObjects(), 0, objArr, length, extraSize);
                    results = objArr;
                }
                Object[] objArr2 = results;
                int length2 = objArr2.length;
                for (int i = 0; i < length2 && (obj = objArr2[i]) != null; i++) {
                    if (checkCancelled()) {
                        return;
                    }
                    this.flowableProcessor.onNext(obj);
                    j2++;
                    obj2 = obj;
                }
                if (completedSegments != null) {
                    this.parent.notifySegmentsComplete(completedSegments, obj2);
                }
                accept(-j2);
            } catch (Throwable th) {
                handleThrowableInResponse(th, key, value);
            }
        });
    }

    private boolean shouldSubmit(long j) {
        long j2;
        long j3;
        do {
            j2 = this.requestedAmount.get();
            j3 = j2 + j;
        } while (!this.requestedAmount.compareAndSet(j2, j3));
        return j3 > 0 && (j2 <= 0 || j <= 0);
    }

    private void handleThrowableInResponse(Throwable th, Address address, IntSet intSet) {
        if (this.cancelled) {
            log.tracef("Encountered exception after subscription was cancelled, this can most likely ignored, message is %s", th.getMessage());
        } else if (!this.parent.handleThrowable(th, address, intSet)) {
            this.flowableProcessor.onError(th);
        } else {
            this.currentTarget = null;
            accept(0L);
        }
    }

    private boolean checkCancelled() {
        if (!this.cancelled) {
            return false;
        }
        if (!log.isTraceEnabled()) {
            return true;
        }
        log.tracef("Subscription %s was cancelled, terminating early", this);
        return true;
    }
}
