package io.rtr.conduit.amqp.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.rtr.conduit.amqp.AMQPConsumerCallback;
import io.rtr.conduit.amqp.AMQPMessageBundle;
import io.rtr.conduit.amqp.ActionResponse;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/rtr/conduit/amqp/impl/AMQPQueueConsumer.class */
public class AMQPQueueConsumer extends DefaultConsumer {
    private static final Logger log = LoggerFactory.getLogger(AMQPQueueConsumer.class);
    private static final String HEADER_RETRY_COUNT = "conduit-retry-count";
    private AMQPConsumerCallback callback;
    private int threshold;
    protected final Channel channel;
    private String poisonPrefix;
    private boolean poisonQueueEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AMQPQueueConsumer(Channel channel, AMQPConsumerCallback aMQPConsumerCallback, int i, String str, boolean z) {
        super(channel);
        this.callback = aMQPConsumerCallback;
        this.threshold = i;
        this.channel = channel;
        this.poisonPrefix = str;
        this.poisonQueueEnabled = z;
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        log.info("Shutdown handler invoked");
        this.callback.notifyOfShutdown(str, shutdownSignalException);
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        AMQPMessageBundle aMQPMessageBundle = new AMQPMessageBundle(str, envelope, basicProperties, bArr);
        try {
            respond(aMQPMessageBundle, this.callback.handle(aMQPMessageBundle));
        } catch (RuntimeException e) {
            log.error("The user-supplied callback allowed an exception to propagate.");
            log.error("Catastrophic - all listeners have stopped! Exception: ", e);
            throw e;
        }
    }

    private void respond(AMQPMessageBundle aMQPMessageBundle, ActionResponse actionResponse) {
        Envelope envelope = aMQPMessageBundle.getEnvelope();
        Long valueOf = Long.valueOf(envelope.getDeliveryTag());
        byte[] body = aMQPMessageBundle.getBody();
        AMQP.BasicProperties basicProperties = aMQPMessageBundle.getBasicProperties();
        try {
            switch (actionResponse.getAction()) {
                case Acknowledge:
                    ack(valueOf);
                    break;
                case RejectAndDiscard:
                    log.warn("Discarding message, body = " + new String(body));
                    publishToPoisonQueue(envelope, basicProperties, actionResponse.getReason(), body);
                    reject(valueOf.longValue());
                    break;
                case RejectAndRequeue:
                    log.warn("Received an unknown message, body = " + new String(body));
                    log.warn("\tAdjusting headers for retry.");
                    if (!retry(envelope, basicProperties, body)) {
                        publishToPoisonQueue(envelope, basicProperties, actionResponse.getReason(), body);
                    }
                    reject(valueOf.longValue());
                    break;
            }
        } catch (Exception e) {
            this.callback.notifyOfActionFailure(e);
        }
    }

    private void ack(Long l) throws IOException {
        this.channel.basicAck(l.longValue(), false);
    }

    private void reject(long j) throws IOException {
        this.channel.basicReject(j, false);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean retry(Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        Map headers = basicProperties.getHeaders();
        Object obj = headers.get(HEADER_RETRY_COUNT);
        int i = 0;
        if (obj != null) {
            try {
                i = Integer.parseInt(obj.toString());
            } catch (NumberFormatException e) {
                log.error("Received an invalid retry-count header, body = " + new String(bArr) + ", header = " + obj);
            }
        } else {
            log.warn("Received message without retry-count header, body = " + new String(bArr));
        }
        if (i >= this.threshold) {
            return false;
        }
        HashMap hashMap = new HashMap(headers);
        hashMap.put(HEADER_RETRY_COUNT, Integer.valueOf(i + 1));
        this.channel.basicPublish(envelope.getExchange(), envelope.getRoutingKey(), new AMQP.BasicProperties().builder().type(basicProperties.getType()).deliveryMode(basicProperties.getDeliveryMode()).priority(basicProperties.getPriority()).headers(hashMap).build(), bArr);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void publishToPoisonQueue(Envelope envelope, AMQP.BasicProperties basicProperties, String str, byte[] bArr) throws IOException {
        if (this.poisonQueueEnabled) {
            if (str != null && !str.trim().isEmpty()) {
                HashMap hashMap = new HashMap(basicProperties.getHeaders());
                hashMap.put(ActionResponse.REASON_KEY, str);
                basicProperties = createCopyWithNewHeaders(basicProperties, hashMap);
            }
            this.channel.basicPublish(envelope.getExchange(), envelope.getRoutingKey() + this.poisonPrefix + ".poison", basicProperties, bArr);
        }
    }

    protected AMQP.BasicProperties createCopyWithNewHeaders(AMQP.BasicProperties basicProperties, Map<String, Object> map) {
        return new AMQP.BasicProperties().builder().contentType(basicProperties.getContentType()).contentEncoding(basicProperties.getContentEncoding()).headers(map).deliveryMode(basicProperties.getDeliveryMode()).priority(basicProperties.getPriority()).correlationId(basicProperties.getCorrelationId()).replyTo(basicProperties.getReplyTo()).expiration(basicProperties.getExpiration()).messageId(basicProperties.getMessageId()).timestamp(basicProperties.getTimestamp()).type(basicProperties.getType()).userId(basicProperties.getUserId()).appId(basicProperties.getAppId()).clusterId(basicProperties.getClusterId()).build();
    }
}
