package org.elder.sourcerer.subscription;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.elder.sourcerer.EventRecord;
import org.elder.sourcerer.EventRepository;
import org.elder.sourcerer.EventSubscriptionHandler;
import org.elder.sourcerer.EventSubscriptionPositionSource;
import org.elder.sourcerer.SubscriptionToken;
import org.elder.sourcerer.SubscriptionWorkerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.subscriber.LambdaSubscriber;
import reactor.core.subscriber.Subscribers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/elder/sourcerer/subscription/SubscriptionWorker.class */
public class SubscriptionWorker<T> implements Runnable, SubscriptionToken {
    private static final Logger logger = LoggerFactory.getLogger(SubscriptionWorker.class);
    private final EventRepository<T> repository;
    private final EventSubscriptionPositionSource positionSource;
    private final EventSubscriptionHandler<T> handler;
    private final SubscriptionWorkerConfig config;
    private int subscriberCount;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);
    private final AtomicInteger retryCount = new AtomicInteger(0);
    private final Semaphore sleeper = new Semaphore(0);

    public SubscriptionWorker(EventRepository<T> eventRepository, EventSubscriptionPositionSource eventSubscriptionPositionSource, EventSubscriptionHandler<T> eventSubscriptionHandler, SubscriptionWorkerConfig subscriptionWorkerConfig) {
        this.repository = eventRepository;
        this.positionSource = eventSubscriptionPositionSource;
        this.handler = eventSubscriptionHandler;
        this.config = subscriptionWorkerConfig;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.handler.subscriptionStarted(this);
        while (true) {
            try {
                try {
                    runOneSession();
                    logger.info("Subscription stop acknowledge, thread terminating");
                    this.handler.subscriptionStopped();
                    return;
                } catch (Exception e) {
                    logger.warn("Exception in subscription, retry logic will apply", e);
                    if (!this.handler.handleError(unwrapException(e), this.retryCount.get())) {
                        logger.warn("Subscription failed with terminal error", e);
                        this.handler.subscriptionFailed(e);
                        return;
                    } else if (!sleepForRetry(this.retryCount.getAndIncrement())) {
                        logger.debug("Asked to stop by sleeper, terminating thread");
                        return;
                    } else {
                        logger.info("Subscription restarting after error");
                        this.handler.subscriptionRestarting();
                    }
                }
            } catch (InterruptedException e2) {
                logger.warn("Interrupted processing subscription", e2);
                Thread.currentThread().interrupt();
                return;
            }
        }
    }

    @Override // org.elder.sourcerer.SubscriptionToken
    public void stop() {
        logger.info("Stopping subscription");
        this.cancelled.set(true);
        this.sleeper.release(Integer.MAX_VALUE);
    }

    private static Throwable unwrapException(Exception exc) {
        return exc instanceof DownstreamSubscriptionException ? exc.getCause() : exc;
    }

    private void runOneSession() throws InterruptedException {
        this.subscriberCount++;
        Integer subscriptionPosition = this.positionSource.getSubscriptionPosition();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(this.config.getBatchSize());
        SessionSubscriber sessionSubscriber = new SessionSubscriber(arrayBlockingQueue, "" + this.subscriberCount);
        int batchSize = this.config.getBatchSize();
        sessionSubscriber.getClass();
        Consumer consumer = sessionSubscriber::onNext;
        sessionSubscriber.getClass();
        Consumer consumer2 = sessionSubscriber::onError;
        sessionSubscriber.getClass();
        LambdaSubscriber bounded = Subscribers.bounded(batchSize, consumer, consumer2, sessionSubscriber::onComplete);
        try {
            logger.info("Subscribing to event store ...");
            this.repository.getPublisher(subscriptionPosition).subscribe(bounded);
            while (processUpdates(arrayBlockingQueue)) {
                logger.debug("Processed updates, will do more");
                this.retryCount.set(0);
            }
            logger.info("Subscription worker finishing cleanly");
            logger.info("Subscription worker exiting");
            sessionSubscriber.kill();
            bounded.dispose();
        } catch (Throwable th) {
            logger.info("Subscription worker exiting");
            sessionSubscriber.kill();
            bounded.dispose();
            throw th;
        }
    }

    private boolean processUpdates(BlockingQueue<Update<T>> blockingQueue) throws InterruptedException {
        List<Update<T>> list = null;
        while (true) {
            List<Update<T>> list2 = list;
            if (list2 != null) {
                ArrayList arrayList = null;
                for (Update<T> update : list2) {
                    switch (update.getUpdateType()) {
                        case COMPLETED:
                            logger.debug("Subscription completed, processing pending updates");
                            processEventsIfAny(arrayList);
                            logger.debug("Subscription completed, completing stream");
                            return false;
                        case CAUGHT_UP:
                            logger.debug("Subscription caught up, processing pending events");
                            processEventsIfAny(arrayList);
                            arrayList = null;
                            logger.debug("Subscription caught up, signalling");
                            this.handler.subscriptionCaughtUp();
                            break;
                        case EVENT:
                            if (arrayList == null) {
                                arrayList = new ArrayList();
                            }
                            arrayList.add(update.getEvent());
                            break;
                        case ERROR:
                            logger.debug("Subscription error, processing pending updates");
                            processEventsIfAny(arrayList);
                            logger.debug("Subscription error, signalling", update.getError());
                            throw new DownstreamSubscriptionException(update.getError());
                        default:
                            throw new IllegalArgumentException("Unknown update type");
                    }
                }
                processEventsIfAny(arrayList);
                return true;
            }
            if (this.cancelled.get()) {
                logger.info("Seen cancelled flag, bailing out");
                return false;
            }
            list = getUpdateBatch(blockingQueue);
        }
    }

    private void processEventsIfAny(List<EventRecord<T>> list) {
        if (list == null || list.isEmpty()) {
            return;
        }
        this.handler.processEvents(list);
    }

    private List<Update<T>> getUpdateBatch(BlockingQueue<Update<T>> blockingQueue) throws InterruptedException {
        Update<T> poll = blockingQueue.poll(1000L, TimeUnit.MILLISECONDS);
        if (poll == null) {
            logger.trace("No update (yet)");
            return null;
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(poll);
        if (blockingQueue.peek() != null) {
            logger.debug("Subscription received update, queue not empty, draining ...");
            blockingQueue.drainTo(arrayList);
        } else {
            logger.debug("Subscription received single update");
        }
        return arrayList;
    }

    private boolean sleepForRetry(int i) throws InterruptedException {
        long currentRetryInterval = getCurrentRetryInterval(i);
        logger.info("Sleeping for {} millis before retrying subscription", Long.valueOf(currentRetryInterval));
        this.sleeper.tryAcquire(currentRetryInterval, TimeUnit.MILLISECONDS);
        return !this.cancelled.get();
    }

    private long getCurrentRetryInterval(int i) {
        long initialRetryDelayMillis = this.config.getInitialRetryDelayMillis();
        for (int i2 = 0; i2 < i; i2++) {
            initialRetryDelayMillis <<= 1;
            if (initialRetryDelayMillis > this.config.getMaxRetryDelayMillis()) {
                return this.config.getMaxRetryDelayMillis();
            }
        }
        return initialRetryDelayMillis;
    }
}
