package io.rtr.conduit.amqp.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.rtr.conduit.amqp.AMQPAsyncConsumerCallback;
import io.rtr.conduit.amqp.AMQPMessageBundle;
import io.rtr.conduit.amqp.ActionResponse;
import io.rtr.conduit.amqp.AsyncResponse;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/rtr/conduit/amqp/impl/AMQPAsyncQueueConsumer.class */
public class AMQPAsyncQueueConsumer extends AMQPQueueConsumer implements AsyncResponse {
    private static final Logger log = LoggerFactory.getLogger(AMQPAsyncQueueConsumer.class);
    private final AMQPAsyncConsumerCallback callback;
    private final Map<Long, AMQPMessageBundle> unacknowledgedMessages;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AMQPAsyncQueueConsumer(Channel channel, AMQPAsyncConsumerCallback aMQPAsyncConsumerCallback, int i, String str, boolean z) {
        super(channel, null, i, str, z);
        this.unacknowledgedMessages = new LinkedHashMap();
        this.callback = aMQPAsyncConsumerCallback;
    }

    @Override // io.rtr.conduit.amqp.impl.AMQPQueueConsumer
    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        log.info("Shutdown handler invoked");
        this.callback.notifyOfShutdown(str, shutdownSignalException);
    }

    @Override // io.rtr.conduit.amqp.impl.AMQPQueueConsumer
    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        AMQPMessageBundle aMQPMessageBundle = new AMQPMessageBundle(str, envelope, basicProperties, bArr);
        try {
            this.unacknowledgedMessages.put(Long.valueOf(aMQPMessageBundle.getEnvelope().getDeliveryTag()), aMQPMessageBundle);
            this.callback.handle(aMQPMessageBundle, this);
        } catch (RuntimeException e) {
            log.error("The user-supplied callback allowed an exception to propagate.");
            log.error("Catastrophic - all listeners have stopped! Exception: ", e);
            throw e;
        }
    }

    protected void respond(AMQPMessageBundle aMQPMessageBundle, ActionResponse actionResponse, boolean z) {
        Envelope envelope = aMQPMessageBundle.getEnvelope();
        Long valueOf = Long.valueOf(envelope.getDeliveryTag());
        byte[] body = aMQPMessageBundle.getBody();
        AMQP.BasicProperties basicProperties = aMQPMessageBundle.getBasicProperties();
        try {
            switch (actionResponse.getAction()) {
                case Acknowledge:
                    ack(valueOf, z);
                    break;
                case RejectAndDiscard:
                    log.warn("Discarding message, body = " + new String(body));
                    publishToPoisonQueue(envelope, basicProperties, actionResponse.getReason(), body, z);
                    reject(valueOf.longValue(), z);
                    break;
                case RejectAndRequeue:
                    log.warn("Received an unknown message, body = " + new String(body));
                    log.warn("\tAdjusting headers for retry.");
                    if (!retry(envelope, basicProperties, body, z)) {
                        publishToPoisonQueue(envelope, basicProperties, actionResponse.getReason(), body, z);
                    }
                    reject(valueOf.longValue(), z);
                    break;
            }
        } catch (Exception e) {
            this.callback.notifyOfActionFailure(e);
        }
    }

    private void removeUnacknowledgedMessages(Long l, boolean z) {
        if (!z) {
            this.unacknowledgedMessages.remove(l);
            return;
        }
        Iterator<Map.Entry<Long, AMQPMessageBundle>> it = this.unacknowledgedMessages.entrySet().iterator();
        while (it.hasNext() && it.next().getKey().longValue() <= l.longValue()) {
            it.remove();
        }
    }

    private void ack(Long l, boolean z) throws IOException {
        removeUnacknowledgedMessages(l, z);
        this.channel.basicAck(l.longValue(), z);
    }

    private void reject(long j, boolean z) throws IOException {
        removeUnacknowledgedMessages(Long.valueOf(j), z);
        if (z) {
            this.channel.basicNack(j, true, false);
        } else {
            this.channel.basicReject(j, false);
        }
    }

    private void publishToPoisonQueue(Envelope envelope, AMQP.BasicProperties basicProperties, String str, byte[] bArr, boolean z) throws IOException {
        if (!z) {
            super.publishToPoisonQueue(envelope, basicProperties, str, bArr);
            return;
        }
        long deliveryTag = envelope.getDeliveryTag();
        for (AMQPMessageBundle aMQPMessageBundle : this.unacknowledgedMessages.values()) {
            if (aMQPMessageBundle.getEnvelope().getDeliveryTag() > deliveryTag) {
                return;
            } else {
                super.publishToPoisonQueue(aMQPMessageBundle.getEnvelope(), aMQPMessageBundle.getBasicProperties(), str, aMQPMessageBundle.getBody());
            }
        }
    }

    private boolean retry(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr, boolean z) throws IOException {
        boolean z2 = true;
        if (!z) {
            boolean retry = super.retry(envelope, basicProperties, bArr);
            if (retry) {
                this.unacknowledgedMessages.remove(Long.valueOf(envelope.getDeliveryTag()));
            }
            return retry;
        }
        Iterator<AMQPMessageBundle> it = this.unacknowledgedMessages.values().iterator();
        long deliveryTag = envelope.getDeliveryTag();
        while (it.hasNext()) {
            AMQPMessageBundle next = it.next();
            if (next.getEnvelope().getDeliveryTag() > deliveryTag) {
                break;
            }
            boolean retry2 = super.retry(next.getEnvelope(), next.getBasicProperties(), next.getBody());
            z2 &= retry2;
            if (retry2) {
                it.remove();
            }
        }
        return z2;
    }

    @Override // io.rtr.conduit.amqp.AsyncResponse
    public void respondMultiple(AMQPMessageBundle aMQPMessageBundle, ActionResponse actionResponse) {
        respond(aMQPMessageBundle, actionResponse, true);
    }

    @Override // io.rtr.conduit.amqp.AsyncResponse
    public void respondSingle(AMQPMessageBundle aMQPMessageBundle, ActionResponse actionResponse) {
        respond(aMQPMessageBundle, actionResponse, false);
    }
}
