package io.axonif.queuebacca;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.axonif.queuebacca.WorkExecutor;
import io.axonif.queuebacca.events.TimingEventListener;
import io.axonif.queuebacca.events.TimingEventSupport;
import io.axonif.queuebacca.retries.StaggeredExponentialBackoff;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/axonif/queuebacca/Subscriber.class */
public final class Subscriber {
    private static final int SUBSCRIPTION_INITIAL_DELAY = 1000;
    private static final int SUBSCRIPTION_PERIODIC_DELAY = 1;
    private final List<ActiveSubscription<?>> activeSubscriptions = new ArrayList();
    private final TimingEventSupport timingEventSupport = new TimingEventSupport();
    private final Client client;
    private final WorkExecutorFactory workExecutorFactory;
    private final ExceptionResolver exceptionResolver;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.axonif.queuebacca.Subscriber$1, reason: invalid class name */
    /* loaded from: input_file:io/axonif/queuebacca/Subscriber$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$io$axonif$queuebacca$MessageResponse = new int[MessageResponse.values().length];

        static {
            try {
                $SwitchMap$io$axonif$queuebacca$MessageResponse[MessageResponse.CONSUMED.ordinal()] = Subscriber.SUBSCRIPTION_PERIODIC_DELAY;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$axonif$queuebacca$MessageResponse[MessageResponse.RETRY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$axonif$queuebacca$MessageResponse[MessageResponse.TERMINATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:io/axonif/queuebacca/Subscriber$ActiveSubscription.class */
    private static class ActiveSubscription<M extends Message> {
        private final Logger logger;
        private final ScheduledExecutorService subscriptionScheduler;
        private final Client client;
        private final MessageBin messageBin;
        private final WorkExecutor workExecutor;
        private final RetryDelayGenerator retryDelayGenerator;
        private final MessageConsumer<M> consumer;
        private final ExceptionResolver exceptionResolver;
        private final TimingEventSupport timingEventSupport;

        ActiveSubscription(ScheduledExecutorService scheduledExecutorService, Client client, MessageBin messageBin, WorkExecutor workExecutor, RetryDelayGenerator retryDelayGenerator, MessageConsumer<M> messageConsumer, ExceptionResolver exceptionResolver, TimingEventSupport timingEventSupport, Logger logger) {
            this.subscriptionScheduler = scheduledExecutorService;
            this.client = client;
            this.messageBin = messageBin;
            this.workExecutor = workExecutor;
            this.retryDelayGenerator = retryDelayGenerator;
            this.consumer = messageConsumer;
            this.exceptionResolver = exceptionResolver;
            this.timingEventSupport = timingEventSupport;
            this.logger = logger;
        }

        static <M extends Message> ActiveSubscription<M> start(SubscriptionConfiguration<M> subscriptionConfiguration, Client client, WorkExecutor workExecutor, ExceptionResolver exceptionResolver, TimingEventSupport timingEventSupport) {
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(Subscriber.SUBSCRIPTION_PERIODIC_DELAY, new ThreadFactoryBuilder().setNameFormat(subscriptionConfiguration.getMessageBin().getName() + "-subscription-%d").build());
            Logger logger = LoggerFactory.getLogger(subscriptionConfiguration.getMessageBin().getName());
            ActiveSubscription<M> activeSubscription = new ActiveSubscription<>(newScheduledThreadPool, client, subscriptionConfiguration.getMessageBin(), workExecutor, subscriptionConfiguration.getRetryDelayGenerator(), subscriptionConfiguration.getMessageConsumer(), exceptionResolver, timingEventSupport, logger);
            newScheduledThreadPool.scheduleAtFixedRate(() -> {
                try {
                    activeSubscription.check();
                } catch (Error e) {
                    logger.error("Error occurred while checking a subscription", e);
                    throw e;
                } catch (InterruptedException e2) {
                } catch (Exception e3) {
                    logger.error("Exception occurred while checking a subscription", e3);
                }
            }, 1000L, 1L, TimeUnit.MILLISECONDS);
            return activeSubscription;
        }

        void cancel() {
            this.logger.info("Cancelling subscription to '{}'", this.messageBin.getName());
            this.subscriptionScheduler.shutdownNow();
            this.workExecutor.shutdownNow();
        }

        private void check() throws InterruptedException {
            this.workExecutor.submitWorkOrders(i -> {
                return (Collection) this.client.retrieveMessages(this.messageBin, i).stream().map(this::newWorkOrder).collect(Collectors.toList());
            });
        }

        private WorkExecutor.WorkOrder newWorkOrder(IncomingEnvelope<M> incomingEnvelope) {
            return () -> {
                MessageResponse resolve;
                MessageContext messageContext = new MessageContext(incomingEnvelope.getMessageId(), incomingEnvelope.getReadCount(), incomingEnvelope.getFirstReceived(), incomingEnvelope.getRawMessage());
                MDC.put("queuebaccaMessageId", messageContext.getMessageId());
                MDC.put("queuebaccaMessageReadCount", String.valueOf(messageContext.getReadCount()));
                try {
                    try {
                        try {
                            long currentTimeMillis = System.currentTimeMillis();
                            try {
                                resolve = this.consumer.consume(incomingEnvelope.getMessage(), messageContext);
                                this.timingEventSupport.fireEvent(this.messageBin, incomingEnvelope.getMessage().getClass(), incomingEnvelope.getMessageId(), System.currentTimeMillis() - currentTimeMillis);
                            } catch (Throwable th) {
                                this.timingEventSupport.fireEvent(this.messageBin, incomingEnvelope.getMessage().getClass(), incomingEnvelope.getMessageId(), System.currentTimeMillis() - currentTimeMillis);
                                throw th;
                            }
                        } catch (Exception e) {
                            resolve = this.exceptionResolver.resolve(e, messageContext);
                        }
                        handleResponse(resolve, incomingEnvelope);
                    } catch (Error e2) {
                        this.logger.error("Error occurred while processing message: '{}' with body '{}'", new Object[]{incomingEnvelope.getMessageId(), incomingEnvelope.getRawMessage(), e2});
                        throw e2;
                    }
                } finally {
                    MDC.clear();
                }
            };
        }

        private void handleResponse(MessageResponse messageResponse, IncomingEnvelope<M> incomingEnvelope) {
            switch (AnonymousClass1.$SwitchMap$io$axonif$queuebacca$MessageResponse[messageResponse.ordinal()]) {
                case Subscriber.SUBSCRIPTION_PERIODIC_DELAY /* 1 */:
                    this.logger.info("Consumed '{}'; disposing", incomingEnvelope.getMessageId());
                    this.client.disposeMessage(this.messageBin, incomingEnvelope);
                    return;
                case StaggeredExponentialBackoff.DEFAULT_BASE /* 2 */:
                    int nextRetryDelay = this.retryDelayGenerator.nextRetryDelay(incomingEnvelope.getReadCount());
                    this.logger.warn("Retrying Message '{}' with body {} after {} seconds", new Object[]{incomingEnvelope.getMessageId(), incomingEnvelope.getRawMessage(), Integer.valueOf(nextRetryDelay)});
                    this.client.returnMessage(this.messageBin, incomingEnvelope, nextRetryDelay);
                    return;
                case 3:
                    this.logger.warn("Terminated '{}' with body {}; disposing", incomingEnvelope.getMessageId(), incomingEnvelope.getRawMessage());
                    this.client.disposeMessage(this.messageBin, incomingEnvelope);
                    return;
                default:
                    this.logger.error("Unknown exception resolution, {}, for '{}'  with body {}; disposing", new Object[]{messageResponse, incomingEnvelope.getMessageId(), incomingEnvelope.getRawMessage()});
                    this.client.disposeMessage(this.messageBin, incomingEnvelope);
                    return;
            }
        }
    }

    public Subscriber(Client client, WorkExecutorFactory workExecutorFactory, ExceptionResolver exceptionResolver) {
        this.client = (Client) Objects.requireNonNull(client);
        this.workExecutorFactory = (WorkExecutorFactory) Objects.requireNonNull(workExecutorFactory);
        this.exceptionResolver = (ExceptionResolver) Objects.requireNonNull(exceptionResolver);
    }

    public void subscribe(SubscriptionConfiguration<?> subscriptionConfiguration) {
        Objects.requireNonNull(subscriptionConfiguration);
        this.activeSubscriptions.add(ActiveSubscription.start(subscriptionConfiguration, this.client, this.workExecutorFactory.newWorkExecutor(subscriptionConfiguration.getMessageCapacity(), new ThreadFactoryBuilder().setNameFormat(subscriptionConfiguration.getMessageBin().getName() + "-processor-%d").build()), this.exceptionResolver, this.timingEventSupport));
    }

    public void cancelAll() {
        this.activeSubscriptions.forEach((v0) -> {
            v0.cancel();
        });
    }

    public void addTimingEventListener(TimingEventListener timingEventListener) {
        this.timingEventSupport.addListener(timingEventListener);
    }

    public void removeTimingEventListener(TimingEventListener timingEventListener) {
        this.timingEventSupport.removeListener(timingEventListener);
    }
}
