package io.atleon.rabbitmq;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.CorrelableOutboundMessage;
import reactor.rabbitmq.SendOptions;
import reactor.rabbitmq.Sender;
import reactor.rabbitmq.SenderOptions;

/* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQSender.class */
public class AloRabbitMQSender<T> {
    public static final String CONFIG_PREFIX = "rabbitmq-sender-";
    public static final String INTERCEPTORS_CONFIG = "rabbitmq-sender-send-interceptors";
    public static final String BODY_SERIALIZER_CONFIG = "rabbitmq-sender-body-serializer";
    private static final SendOptions SEND_OPTIONS = new SendOptions();
    private static final SendOptions ALO_SEND_OPTIONS = new SendOptions().exceptionHandler(AloRabbitMQSender::handleAloSendException);
    private final Mono<SendResources<T>> futureResources;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/rabbitmq/AloRabbitMQSender$SendResources.class */
    public static final class SendResources<T> {
        private final Sender sender;
        private final List<RabbitMQMessageSendInterceptor<T>> interceptors;
        private final BodySerializer<T> bodySerializer;

        public SendResources(Sender sender, List<RabbitMQMessageSendInterceptor<T>> list, BodySerializer<T> bodySerializer) {
            this.sender = sender;
            this.bodySerializer = bodySerializer;
            this.interceptors = list;
        }

        public static <T> SendResources<T> fromConfig(RabbitMQConfig rabbitMQConfig) {
            return new SendResources<>(new Sender(new SenderOptions().connectionFactory(rabbitMQConfig.getConnectionFactory())), rabbitMQConfig.loadListOfConfigured(AloRabbitMQSender.INTERCEPTORS_CONFIG), (BodySerializer) rabbitMQConfig.loadConfiguredOrThrow(AloRabbitMQSender.BODY_SERIALIZER_CONFIG));
        }

        public <C> CorrelableOutboundMessage<C> createOutboundMessage(RabbitMQMessage<T> rabbitMQMessage, C c) {
            SerializedBody serialize = this.bodySerializer.serialize(rabbitMQMessage.getBody());
            Iterator<RabbitMQMessageSendInterceptor<T>> it = this.interceptors.iterator();
            while (it.hasNext()) {
                rabbitMQMessage = it.next().onSend(rabbitMQMessage, serialize);
            }
            return new CorrelableOutboundMessage<>(rabbitMQMessage.getExchange(), rabbitMQMessage.getRoutingKey(), rabbitMQMessage.getProperties(), serialize.bytes(), c);
        }

        public Sender getSender() {
            return this.sender;
        }
    }

    private AloRabbitMQSender(RabbitMQConfigSource rabbitMQConfigSource) {
        this.futureResources = ((Mono) rabbitMQConfigSource.create()).map(SendResources::fromConfig).cache();
    }

    public static <T> AloRabbitMQSender<T> from(RabbitMQConfigSource rabbitMQConfigSource) {
        return new AloRabbitMQSender<>(rabbitMQConfigSource);
    }

    public Function<Publisher<T>, Flux<RabbitMQSenderResult<T>>> sendBodies(RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return publisher -> {
            return sendBodies(publisher, rabbitMQMessageCreator);
        };
    }

    public Flux<RabbitMQSenderResult<T>> sendBodies(Publisher<T> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendBodies(sendResources, publisher, rabbitMQMessageCreator);
        });
    }

    public Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(Publisher<RabbitMQMessage<T>> publisher) {
        return this.futureResources.flatMapMany(sendResources -> {
            return sendMessages(sendResources, publisher);
        });
    }

    public Function<Publisher<Alo<T>>, AloFlux<RabbitMQSenderResult<T>>> sendAloBodies(RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return publisher -> {
            return sendAloBodies(publisher, rabbitMQMessageCreator);
        };
    }

    public AloFlux<RabbitMQSenderResult<T>> sendAloBodies(Publisher<Alo<T>> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return (AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendAloBodies(sendResources, publisher, rabbitMQMessageCreator);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public AloFlux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendAloMessages(Publisher<Alo<RabbitMQMessage<T>>> publisher) {
        return (AloFlux) this.futureResources.flatMapMany(sendResources -> {
            return sendAloMessages(sendResources, publisher);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    private Flux<RabbitMQSenderResult<T>> sendBodies(SendResources<T> sendResources, Publisher<T> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return Flux.from(publisher).map(obj -> {
            return sendResources.createOutboundMessage(rabbitMQMessageCreator.apply(obj), obj);
        }).transform(flux -> {
            return sendResources.getSender().sendWithTypedPublishConfirms(flux, SEND_OPTIONS);
        }).map(RabbitMQSenderResult::fromMessageResult);
    }

    private Flux<RabbitMQSenderResult<RabbitMQMessage<T>>> sendMessages(SendResources<T> sendResources, Publisher<RabbitMQMessage<T>> publisher) {
        return Flux.from(publisher).map(rabbitMQMessage -> {
            return sendResources.createOutboundMessage(rabbitMQMessage, rabbitMQMessage);
        }).transform(flux -> {
            return sendResources.getSender().sendWithTypedPublishConfirms(flux, SEND_OPTIONS);
        }).map(RabbitMQSenderResult::fromMessageResult);
    }

    private Flux<Alo<RabbitMQSenderResult<T>>> sendAloBodies(SendResources<T> sendResources, Publisher<Alo<T>> publisher, RabbitMQMessageCreator<T> rabbitMQMessageCreator) {
        return AloFlux.toFlux(publisher).map(alo -> {
            return sendResources.createOutboundMessage((RabbitMQMessage) rabbitMQMessageCreator.apply(alo.get()), alo);
        }).transform(flux -> {
            return sendResources.getSender().sendWithTypedPublishConfirms(flux, ALO_SEND_OPTIONS);
        }).map(RabbitMQSenderResult::fromMessageResultOfAlo);
    }

    private Flux<Alo<RabbitMQSenderResult<RabbitMQMessage<T>>>> sendAloMessages(SendResources<T> sendResources, Publisher<Alo<RabbitMQMessage<T>>> publisher) {
        return AloFlux.toFlux(publisher).map(alo -> {
            return sendResources.createOutboundMessage((RabbitMQMessage) alo.get(), alo);
        }).transform(flux -> {
            return sendResources.getSender().sendWithTypedPublishConfirms(flux, ALO_SEND_OPTIONS);
        }).map(RabbitMQSenderResult::fromMessageResultOfAlo);
    }

    private static void handleAloSendException(Sender.SendContext sendContext, Exception exc) {
        Alo.nacknowledge((Alo) Alo.class.cast(((CorrelableOutboundMessage) CorrelableOutboundMessage.class.cast(sendContext.getMessage())).getCorrelationMetadata()), exc);
    }
}
