package net.reini.rabbitmq.cdi;

import com.rabbitmq.client.Delivery;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoverableChannel;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/reini/rabbitmq/cdi/ConsumerHolder.class */
public class ConsumerHolder implements RecoveryListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerHolder.class);
    private final int prefetchCount;
    private final boolean autoAck;
    private final String queueName;
    private final EventConsumer<?> consumer;
    private final ConsumerChannelFactory consumerChannelFactory;
    private final DeclarerRepository declarerRepository;
    private final List<Declaration> declarations;
    private RecoverableChannel channel;
    private volatile boolean active;
    private volatile boolean recoverRunning;
    private final ResourceCloser resourceCloser = new ResourceCloser();
    private final Queue<AckAction> pendingAckActions = new ArrayDeque();

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:net/reini/rabbitmq/cdi/ConsumerHolder$AckAction.class */
    public interface AckAction {
        void apply(RecoverableChannel recoverableChannel) throws IOException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ConsumerHolder(EventConsumer<?> eventConsumer, String str, boolean z, int i, ConsumerChannelFactory consumerChannelFactory, List<Declaration> list, DeclarerRepository declarerRepository) {
        this.consumer = eventConsumer;
        this.queueName = str;
        this.autoAck = z;
        this.prefetchCount = i;
        this.consumerChannelFactory = consumerChannelFactory;
        this.declarations = list;
        this.declarerRepository = declarerRepository;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void deactivate() {
        synchronized (this.pendingAckActions) {
            if (this.active) {
                LOGGER.debug("Deactivating consumer of class {}", this.consumer.getClass());
                LOGGER.debug("Closing channel for consumer of class {}", this.consumer.getClass());
                ensureCompleteShutdown();
                this.active = false;
            }
            LOGGER.info("Deactivated consumer of class {}", this.consumer.getClass());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void activate() throws IOException {
        synchronized (this.pendingAckActions) {
            if (!this.active) {
                LOGGER.debug("Activating consumer of class {}", this.consumer.getClass());
                try {
                    this.channel = this.consumerChannelFactory.createChannel();
                    this.channel.addRecoveryListener(this);
                    this.channel.basicQos(this.prefetchCount);
                    this.declarerRepository.declare(this.channel, this.declarations);
                    this.channel.basicConsume(this.queueName, this.autoAck, this.autoAck ? this::deliverNoAck : this::deliverWithAck, this::handleShutdownSignal);
                    LOGGER.info("Activated consumer of class {}", this.consumer.getClass());
                    this.active = true;
                } catch (Exception e) {
                    LOGGER.error("Failed to activate consumer of class {}", this.consumer.getClass(), e);
                    ensureCompleteShutdown();
                    throw e;
                }
            }
        }
    }

    void deliverNoAck(String str, Delivery delivery) throws IOException {
        Envelope envelope = delivery.getEnvelope();
        LOGGER.debug("Consuming message {} for consumer tag {}", envelope, str);
        this.consumer.consume(str, envelope, delivery.getProperties(), delivery.getBody());
    }

    void deliverWithAck(String str, Delivery delivery) throws IOException {
        Envelope envelope = delivery.getEnvelope();
        long deliveryTag = envelope.getDeliveryTag();
        LOGGER.debug("Consuming message {} for consumer tag {}", envelope, str);
        if (this.consumer.consume(str, envelope, delivery.getProperties(), delivery.getBody())) {
            invokeAckAction(recoverableChannel -> {
                recoverableChannel.basicAck(deliveryTag, false);
                LOGGER.debug("Acknowledged {}", delivery);
            });
        } else {
            invokeAckAction(recoverableChannel2 -> {
                recoverableChannel2.basicNack(deliveryTag, false, false);
                LOGGER.debug("Not acknowledged {}", envelope);
            });
        }
    }

    void invokeAckAction(AckAction ackAction) throws IOException {
        if (!this.recoverRunning) {
            ackAction.apply(this.channel);
            return;
        }
        synchronized (this.pendingAckActions) {
            LOGGER.debug("Queueing acknowledge action due to active recovery...");
            this.pendingAckActions.add(ackAction);
        }
    }

    void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        LOGGER.info("Received shutdown signal {} for consumer tag {}", shutdownSignalException, str);
    }

    void ensureCompleteShutdown() {
        synchronized (this.pendingAckActions) {
            if (this.channel != null) {
                this.resourceCloser.closeResource(this.channel, "Closing channel failed");
                this.channel = null;
            }
        }
    }

    boolean isAutoAck() {
        return this.autoAck;
    }

    String getQueueName() {
        return this.queueName;
    }

    public void handleRecovery(Recoverable recoverable) {
        LOGGER.debug("Handle recovery");
        if (recoverable == null || !recoverable.equals(this.channel)) {
            return;
        }
        this.recoverRunning = false;
        synchronized (this.pendingAckActions) {
            this.pendingAckActions.removeIf(this::invokePendingAckAction);
        }
    }

    boolean invokePendingAckAction(AckAction ackAction) {
        if (this.recoverRunning) {
            return false;
        }
        try {
            ackAction.apply(this.channel);
            return true;
        } catch (IOException e) {
            LOGGER.warn("Unable to invoke pending acknowledge action", e);
            return false;
        }
    }

    public void handleRecoveryStarted(Recoverable recoverable) {
        LOGGER.debug("Handle recovery started");
        if (recoverable == null || !recoverable.equals(this.channel)) {
            return;
        }
        this.recoverRunning = true;
    }
}
