package io.irain.reactor.rabbitmq.consumer;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.irain.reactor.rabbitmq.exception.QueueException;
import io.irain.reactor.rabbitmq.message.QueueEvent;
import io.irain.reactor.rabbitmq.properties.RabbitMQProperties;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.Receiver;
import reactor.rabbitmq.ResourcesSpecification;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

/* loaded from: input_file:io/irain/reactor/rabbitmq/consumer/RabbitMQReceiver.class */
public abstract class RabbitMQReceiver<T extends QueueEvent> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQReceiver.class);

    @Resource
    private Receiver receiver;

    @Resource
    private Sender sender;

    @Resource
    private ObjectMapper objectMapper;

    @Resource
    private RabbitMQProperties rabbitMQProperties;

    @PostConstruct
    private void init() {
        consume(getQueue()).subscribe();
    }

    public Flux<Void> consume(String str) {
        return this.sender.declare(ResourcesSpecification.queue(str).durable(true)).thenMany(this.receiver.consumeAutoAck(str).flatMap(delivery -> {
            return deserializeEvent(delivery.getBody()).doOnNext(queueEvent -> {
                log.info("RabbitMQ received message {}", queueEvent);
            }).flatMap(this::handle).retryWhen(Retry.backoff(this.rabbitMQProperties.getRetry().longValue(), Duration.ofSeconds(this.rabbitMQProperties.getMinBackoff().longValue())));
        }));
    }

    public abstract Mono<Void> handle(T t);

    private Mono<T> deserializeEvent(byte[] bArr) {
        return Mono.fromCallable(() -> {
            return (QueueEvent) this.objectMapper.readValue(bArr, getType());
        }).onErrorMap(JsonParseException.class, QueueException::new).subscribeOn(Schedulers.boundedElastic());
    }

    private Class<T> getType() {
        Type genericSuperclass = getClass().getGenericSuperclass();
        Class<T> cls = null;
        if (genericSuperclass instanceof ParameterizedType) {
            cls = (Class) ((ParameterizedType) genericSuperclass).getActualTypeArguments()[0];
        }
        return cls;
    }

    public abstract String getQueue();
}
