package org.springframework.amqp.rabbit.batch;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-rabbit-2.3.9.jar:org/springframework/amqp/rabbit/batch/SimpleBatchingStrategy.class */
public class SimpleBatchingStrategy implements BatchingStrategy {
    private final int batchSize;
    private final int bufferLimit;
    private final long timeout;
    private final List<Message> messages = new ArrayList();
    private String exchange;
    private String routingKey;
    private int currentSize;

    public SimpleBatchingStrategy(int i, int i2, long j) {
        this.batchSize = i;
        this.bufferLimit = i2;
        this.timeout = j;
    }

    @Override // org.springframework.amqp.rabbit.batch.BatchingStrategy
    public MessageBatch addToBatch(String str, String str2, Message message) {
        if (this.exchange != null) {
            Assert.isTrue(this.exchange.equals(str), "Cannot send to different exchanges in the same batch");
        } else {
            this.exchange = str;
        }
        if (this.routingKey != null) {
            Assert.isTrue(this.routingKey.equals(str2), "Cannot send with different routing keys in the same batch");
        } else {
            this.routingKey = str2;
        }
        int length = 4 + message.getBody().length;
        MessageBatch messageBatch = null;
        if (this.messages.size() > 0 && this.currentSize + length > this.bufferLimit) {
            messageBatch = doReleaseBatch();
            this.exchange = str;
            this.routingKey = str2;
        }
        this.currentSize += length;
        this.messages.add(message);
        if (messageBatch == null && (this.messages.size() >= this.batchSize || this.currentSize >= this.bufferLimit)) {
            messageBatch = doReleaseBatch();
        }
        return messageBatch;
    }

    @Override // org.springframework.amqp.rabbit.batch.BatchingStrategy
    public Date nextRelease() {
        if (this.messages.size() == 0 || this.timeout <= 0) {
            return null;
        }
        return this.currentSize >= this.bufferLimit ? new Date() : new Date(System.currentTimeMillis() + this.timeout);
    }

    @Override // org.springframework.amqp.rabbit.batch.BatchingStrategy
    public Collection<MessageBatch> releaseBatches() {
        MessageBatch doReleaseBatch = doReleaseBatch();
        return doReleaseBatch == null ? Collections.emptyList() : Collections.singletonList(doReleaseBatch);
    }

    private MessageBatch doReleaseBatch() {
        if (this.messages.size() < 1) {
            return null;
        }
        MessageBatch messageBatch = new MessageBatch(this.exchange, this.routingKey, assembleMessage());
        this.messages.clear();
        this.currentSize = 0;
        this.exchange = null;
        this.routingKey = null;
        return messageBatch;
    }

    private Message assembleMessage() {
        if (this.messages.size() == 1) {
            return this.messages.get(0);
        }
        MessageProperties messageProperties = this.messages.get(0).getMessageProperties();
        byte[] bArr = new byte[this.currentSize];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        for (Message message : this.messages) {
            wrap.putInt(message.getBody().length);
            wrap.put(message.getBody());
        }
        messageProperties.getHeaders().put(MessageProperties.SPRING_BATCH_FORMAT, MessageProperties.BATCH_FORMAT_LENGTH_HEADER4);
        messageProperties.getHeaders().put(AmqpHeaders.BATCH_SIZE, Integer.valueOf(this.messages.size()));
        return new Message(bArr, messageProperties);
    }

    @Override // org.springframework.amqp.rabbit.batch.BatchingStrategy
    public boolean canDebatch(MessageProperties messageProperties) {
        return MessageProperties.BATCH_FORMAT_LENGTH_HEADER4.equals(messageProperties.getHeaders().get(MessageProperties.SPRING_BATCH_FORMAT));
    }

    @Override // org.springframework.amqp.rabbit.batch.BatchingStrategy
    public void deBatch(Message message, Consumer<Message> consumer) {
        ByteBuffer wrap = ByteBuffer.wrap(message.getBody());
        MessageProperties messageProperties = message.getMessageProperties();
        messageProperties.getHeaders().remove(MessageProperties.SPRING_BATCH_FORMAT);
        while (wrap.hasRemaining()) {
            int i = wrap.getInt();
            if (i < 0 || i > wrap.remaining()) {
                throw new ListenerExecutionFailedException("Bad batched message received", new MessageConversionException("Insufficient batch data at offset " + wrap.position()), message);
            }
            byte[] bArr = new byte[i];
            wrap.get(bArr);
            messageProperties.setContentLength(i);
            Message message2 = new Message(bArr, messageProperties);
            if (!wrap.hasRemaining()) {
                messageProperties.setLastInBatch(true);
            }
            consumer.accept(message2);
        }
    }
}
