package org.springframework.integration.amqp.inbound;

import com.rabbitmq.client.Channel;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Address;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.amqp.support.MappingUtils;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.gateway.MessagingGatewaySupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.5.1.jar:org/springframework/integration/amqp/inbound/AmqpInboundGateway.class */
public class AmqpInboundGateway extends MessagingGatewaySupport {
    private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
    private final AbstractMessageListenerContainer messageListenerContainer;
    private final AmqpTemplate amqpTemplate;
    private final boolean amqpTemplateExplicitlySet;
    private MessageConverter amqpMessageConverter;
    private MessageConverter templateMessageConverter;
    private AmqpHeaderMapper headerMapper;
    private Address defaultReplyTo;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private MessageRecoverer messageRecoverer;
    private BatchingStrategy batchingStrategy;
    private boolean bindSourceMessage;
    private boolean replyHeadersMappedLast;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.5.1.jar:org/springframework/integration/amqp/inbound/AmqpInboundGateway$Listener.class */
    protected class Listener implements ChannelAwareMessageListener {
        protected Listener() {
        }

        @Override // org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener
        public void onMessage(Message message, Channel channel) {
            if (AmqpInboundGateway.this.retryTemplate != null) {
                org.springframework.messaging.Message<Object> convert = convert(message, channel);
                if (convert != null) {
                    AmqpInboundGateway.this.retryTemplate.execute(retryContext -> {
                        StaticMessageHeaderAccessor.getDeliveryAttempt(convert).incrementAndGet();
                        process(message, convert);
                        return null;
                    }, AmqpInboundGateway.this.recoveryCallback);
                    return;
                }
                return;
            }
            try {
                org.springframework.messaging.Message<Object> convert2 = convert(message, channel);
                if (convert2 != null) {
                    process(message, convert2);
                }
                AmqpInboundGateway.ATTRIBUTES_HOLDER.remove();
            } catch (Throwable th) {
                AmqpInboundGateway.ATTRIBUTES_HOLDER.remove();
                throw th;
            }
        }

        private org.springframework.messaging.Message<Object> convert(Message message, Channel channel) {
            Object fromMessage;
            boolean z = AmqpInboundGateway.this.messageListenerContainer.getAcknowledgeMode() == AcknowledgeMode.MANUAL;
            try {
                if (AmqpInboundGateway.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
                    ArrayList arrayList = new ArrayList();
                    AmqpInboundGateway.this.batchingStrategy.deBatch(message, message2 -> {
                        arrayList.add(AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message2));
                    });
                    fromMessage = arrayList;
                } else {
                    fromMessage = AmqpInboundGateway.this.amqpMessageConverter.fromMessage(message);
                }
                Map<String, ?> headersFromRequest = AmqpInboundGateway.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
                if (z) {
                    headersFromRequest.put(AmqpHeaders.DELIVERY_TAG, Long.valueOf(message.getMessageProperties().getDeliveryTag()));
                    headersFromRequest.put(AmqpHeaders.CHANNEL, channel);
                }
                if (AmqpInboundGateway.this.retryTemplate != null) {
                    headersFromRequest.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
                }
                if (AmqpInboundGateway.this.bindSourceMessage) {
                    headersFromRequest.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
                }
                return AmqpInboundGateway.this.getMessageBuilderFactory().withPayload(fromMessage).copyHeaders(headersFromRequest).build();
            } catch (RuntimeException e) {
                MessageChannel errorChannel = AmqpInboundGateway.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                AmqpInboundGateway.this.setAttributesIfNecessary(message, null);
                AmqpInboundGateway.this.messagingTemplate.send((MessagingTemplate) errorChannel, (org.springframework.messaging.Message<?>) AmqpInboundGateway.this.buildErrorMessage(null, EndpointUtils.errorMessagePayload(message, channel, z, e)));
                return null;
            }
        }

        private void process(Message message, org.springframework.messaging.Message<Object> message2) {
            AmqpInboundGateway.this.setAttributesIfNecessary(message, message2);
            org.springframework.messaging.Message sendAndReceiveMessage = AmqpInboundGateway.this.sendAndReceiveMessage(message2);
            if (sendAndReceiveMessage != null) {
                String replyTo = message.getMessageProperties().getReplyTo();
                Address address = replyTo != null ? new Address(replyTo) : AmqpInboundGateway.this.defaultReplyTo;
                Message mapReplyMessage = MappingUtils.mapReplyMessage(sendAndReceiveMessage, AmqpInboundGateway.this.templateMessageConverter, AmqpInboundGateway.this.headerMapper, message.getMessageProperties().getReceivedDeliveryMode(), AmqpInboundGateway.this.replyHeadersMappedLast);
                if (address != null) {
                    AmqpInboundGateway.this.amqpTemplate.send(address.getExchangeName(), address.getRoutingKey(), mapReplyMessage);
                } else {
                    if (!AmqpInboundGateway.this.amqpTemplateExplicitlySet) {
                        throw new IllegalStateException("There is no 'replyTo' message property and the `defaultReplyTo` hasn't been configured.");
                    }
                    AmqpInboundGateway.this.amqpTemplate.send(mapReplyMessage);
                }
            }
        }
    }

    public AmqpInboundGateway(AbstractMessageListenerContainer abstractMessageListenerContainer) {
        this(abstractMessageListenerContainer, new RabbitTemplate(abstractMessageListenerContainer.getConnectionFactory()), false);
    }

    public AmqpInboundGateway(AbstractMessageListenerContainer abstractMessageListenerContainer, AmqpTemplate amqpTemplate) {
        this(abstractMessageListenerContainer, amqpTemplate, true);
    }

    private AmqpInboundGateway(AbstractMessageListenerContainer abstractMessageListenerContainer, AmqpTemplate amqpTemplate, boolean z) {
        this.amqpMessageConverter = new SimpleMessageConverter();
        this.templateMessageConverter = this.amqpMessageConverter;
        this.headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
        this.batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
        Assert.notNull(abstractMessageListenerContainer, "listenerContainer must not be null");
        Assert.notNull(amqpTemplate, "'amqpTemplate' must not be null");
        Assert.isNull(abstractMessageListenerContainer.getMessageListener(), "The listenerContainer provided to an AMQP inbound Gateway must not have a MessageListener configured since the adapter needs to configure its own listener implementation.");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.amqpTemplate = amqpTemplate;
        this.amqpTemplateExplicitlySet = z;
        if (this.amqpTemplateExplicitlySet && (this.amqpTemplate instanceof RabbitTemplate)) {
            this.templateMessageConverter = ((RabbitTemplate) this.amqpTemplate).getMessageConverter();
        }
        setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "MessageConverter must not be null");
        this.amqpMessageConverter = messageConverter;
        if (this.amqpTemplateExplicitlySet) {
            return;
        }
        ((RabbitTemplate) this.amqpTemplate).setMessageConverter(messageConverter);
        this.templateMessageConverter = messageConverter;
    }

    public void setHeaderMapper(AmqpHeaderMapper amqpHeaderMapper) {
        Assert.notNull(amqpHeaderMapper, "headerMapper must not be null");
        this.headerMapper = amqpHeaderMapper;
    }

    public void setDefaultReplyTo(String str) {
        this.defaultReplyTo = new Address(str);
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<? extends Object> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
        this.messageRecoverer = messageRecoverer;
    }

    public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
        Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
        this.batchingStrategy = batchingStrategy;
    }

    public void setBindSourceMessage(boolean z) {
        this.bindSourceMessage = z;
    }

    public void setReplyHeadersMappedLast(boolean z) {
        this.replyHeadersMappedLast = z;
    }

    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "amqp:inbound-gateway";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        if (this.retryTemplate != null) {
            Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
            setupRecoveryCallbackIfAny();
        }
        this.messageListenerContainer.setMessageListener(new Listener());
        this.messageListenerContainer.afterPropertiesSet();
        if (!this.amqpTemplateExplicitlySet) {
            ((RabbitTemplate) this.amqpTemplate).afterPropertiesSet();
        }
        super.onInit();
        if (this.retryTemplate == null || getErrorChannel() == null) {
            return;
        }
        this.logger.warn("Usually, when using a RetryTemplate you should use an ErrorMessageSendingRecoverer and not provide an errorChannel. Using an errorChannel could defeat retry and will receive an error message for each delivery attempt.");
    }

    private void setupRecoveryCallbackIfAny() {
        Assert.state(this.recoveryCallback == null || this.messageRecoverer == null, "Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, but not both");
        if (this.messageRecoverer != null) {
            this.recoveryCallback = retryContext -> {
                this.messageRecoverer.recover((Message) RetrySynchronizationManager.getContext().getAttribute("amqp_raw_message"), retryContext.getLastThrowable());
                return null;
            };
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStart() {
        super.doStart();
        this.messageListenerContainer.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        super.doStop();
        this.messageListenerContainer.stop();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAttributesIfNecessary(Message message, org.springframework.messaging.Message<?> message2) {
        boolean z = getErrorChannel() != null && this.retryTemplate == null;
        boolean z2 = z || this.retryTemplate != null;
        if (z) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (z2) {
            AttributeAccessor context = this.retryTemplate != null ? RetrySynchronizationManager.getContext() : ATTRIBUTES_HOLDER.get();
            if (context != null) {
                context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message2);
                context.setAttribute("amqp_raw_message", message);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.gateway.MessagingGatewaySupport
    public AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message) {
        AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }
}
