package org.elder.sourcerer.subscription;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elder.sourcerer.EventSubscriptionUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/elder/sourcerer/subscription/SessionSubscriber.class */
final class SessionSubscriber<T> {
    private static final Logger logger = LoggerFactory.getLogger(SessionSubscriber.class);
    private final BlockingQueue<Update<T>> updateQueue;
    private final String label;
    private final AtomicBoolean cancelled = new AtomicBoolean(false);

    public SessionSubscriber(BlockingQueue<Update<T>> blockingQueue, String str) {
        this.updateQueue = blockingQueue;
        this.label = str;
    }

    public void kill() {
        logger.info("[{}] Cancelling session subscriber", this.label);
        this.cancelled.set(true);
    }

    public void onNext(EventSubscriptionUpdate<T> eventSubscriptionUpdate) {
        try {
            switch (eventSubscriptionUpdate.getUpdateType()) {
                case EVENT:
                    logger.debug("Offering event {}", eventSubscriptionUpdate.getEvent());
                    tryOffer(Update.createEvent(eventSubscriptionUpdate.getEvent()));
                    break;
                case CAUGHT_UP:
                    logger.debug("Offering caught up");
                    tryOffer(Update.createCaughtUp());
                    break;
                default:
                    throw new IllegalArgumentException("Unrecognized event update type: " + eventSubscriptionUpdate.getUpdateType());
            }
        } catch (InterruptedException e) {
            logger.warn("Session subscriber thread interrupted", e);
            Thread.currentThread().interrupt();
        }
    }

    public void onError(Throwable th) {
        try {
            logger.debug("Offering error: {}", th.toString());
            tryOffer(Update.createError(th));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void onComplete() {
        try {
            logger.debug("Offering completion");
            tryOffer(Update.createCompleted());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void tryOffer(Update<T> update) throws InterruptedException {
        while (!this.cancelled.get()) {
            if (this.updateQueue.offer(update, 1000L, TimeUnit.MILLISECONDS)) {
                return;
            } else {
                logger.warn("[{}] SLOW SUBSCRIBER! Time out offering new items, will retry", this.label);
            }
        }
        logger.debug("[{}] Session subscriber killed, ignoring incoming", this.label);
    }
}
