package io.irain.reactor.rabbitmq.producer;

import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.core.util.StrUtil;
import com.rabbitmq.client.AMQP;
import io.irain.reactor.rabbitmq.common.ExchangeType;
import io.irain.reactor.rabbitmq.common.RabbitParamConstant;
import io.irain.reactor.rabbitmq.message.QueueEvent;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.rabbitmq.ExchangeSpecification;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.ResourcesSpecification;
import reactor.rabbitmq.Sender;

@Component
/* loaded from: input_file:io/irain/reactor/rabbitmq/producer/RabbitMQSender.class */
public class RabbitMQSender {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQSender.class);
    private final Sender sender;

    public Disposable send(QueueEvent queueEvent) {
        queueEvent.setExchange(getExchange(queueEvent.getExchange(), queueEvent.getQueue()));
        OutboundMessage outboundMessage = StringUtils.hasText(queueEvent.getRoutingKey()) ? new OutboundMessage(queueEvent.getExchange(), queueEvent.getRoutingKey(), queueEvent.getPayload()) : new OutboundMessage(queueEvent.getExchange(), queueEvent.getQueue(), queueEvent.getPayload());
        ExchangeSpecification exchange = ResourcesSpecification.exchange((String) Objects.requireNonNullElse(queueEvent.getExchange(), ""));
        HashMap hashMap = new HashMap();
        exchange.type(ExchangeType.TOPIC.getType());
        exchange.durable(true);
        exchange.arguments(hashMap);
        return this.sender.declare(exchange).then(this.sender.bind(ResourcesSpecification.binding(queueEvent.getExchange(), StrUtil.emptyToDefault(queueEvent.getRoutingKey(), RabbitParamConstant.DEFAULT_ROUTING_KEY), queueEvent.getQueue()))).thenMany(this.sender.sendWithPublishConfirms(Flux.just(outboundMessage))).doOnError(th -> {
            log.error("RabbitMQ queue [{}] send failed", queueEvent.getQueue(), th);
        }).subscribe(outboundMessageResult -> {
            if (outboundMessageResult.isAck()) {
                log.info("RabbitMQ queue [{}] send success", queueEvent.getQueue());
            } else {
                log.error("RabbitMQ queue [{}] send failed", queueEvent.getQueue());
            }
        });
    }

    public Disposable send(QueueEvent queueEvent, int i) {
        if (i < 0) {
            throw new RuntimeException("seconds must be greater than 0");
        }
        HashMap hashMap = new HashMap();
        hashMap.put(RabbitParamConstant.DELAY_HEADER, Integer.valueOf(i * 1000));
        AMQP.BasicProperties build = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding(StandardCharsets.UTF_8.name()).headers(hashMap).build();
        queueEvent.setExchange(getExchange(queueEvent.getExchange(), queueEvent.getQueue()));
        String emptyToDefault = StrUtil.emptyToDefault(queueEvent.getRoutingKey(), RabbitParamConstant.DEFAULT_ROUTING_KEY);
        OutboundMessage outboundMessage = new OutboundMessage(queueEvent.getExchange(), emptyToDefault, build, queueEvent.getPayload());
        ExchangeSpecification exchange = ResourcesSpecification.exchange((String) Objects.requireNonNullElse(queueEvent.getExchange(), ""));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(RabbitParamConstant.DELAYED_TYPE, ExchangeType.DIRECT.getType());
        exchange.type(ExchangeType.DELAYED.getType());
        exchange.durable(true);
        exchange.arguments(hashMap2);
        return this.sender.declare(exchange).then(this.sender.declare(ResourcesSpecification.queue(queueEvent.getQueue()).durable(true))).then(this.sender.bind(ResourcesSpecification.binding(queueEvent.getExchange(), emptyToDefault, queueEvent.getQueue()))).thenMany(this.sender.sendWithPublishConfirms(Flux.just(outboundMessage))).doOnError(th -> {
            log.error("RabbitMQ queue [{}] send failed", queueEvent.getQueue(), th);
        }).subscribe(outboundMessageResult -> {
            if (outboundMessageResult.isAck()) {
                log.info("RabbitMQ queue [{}] send success", queueEvent.getQueue());
            } else {
                log.error("RabbitMQ queue [{}] send failed", queueEvent.getQueue());
            }
        });
    }

    private String getExchange(String str, String str2) {
        return CharSequenceUtil.isEmpty(str) ? str2 + "-exchange" : str;
    }

    public RabbitMQSender(Sender sender) {
        this.sender = sender;
    }
}
