package io.atleon.aws.sns;

import io.atleon.core.Alo;
import io.atleon.core.AloFlux;
import java.io.Closeable;
import java.util.Objects;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/atleon/aws/sns/AloSnsSender.class */
public class AloSnsSender<T> implements Closeable {
    public static final String CONFIG_PREFIX = "sns.sender.";
    public static final String BODY_SERIALIZER_CONFIG = "sns.sender.body.serializer";
    public static final String BATCH_SIZE_CONFIG = "sns.sender.batch.size";
    public static final String BATCH_DURATION_CONFIG = "sns.sender.batch.duration";
    public static final String BATCH_PREFETCH_CONFIG = "sns.sender.batch.prefetch";
    public static final String MAX_REQUESTS_IN_FLIGHT_CONFIG = "sns.sender.max.requests.in.flight";
    private static final Logger LOGGER = LoggerFactory.getLogger(AloSnsSender.class);
    private final Mono<Resources<T>> futureResources;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/aws/sns/AloSnsSender$Resources.class */
    public static final class Resources<T> {
        private final SnsSender sender;
        private final BodySerializer<T> bodySerializer;

        private Resources(SnsSender snsSender, BodySerializer<T> bodySerializer) {
            this.sender = snsSender;
            this.bodySerializer = bodySerializer;
        }

        public static <T> Resources<T> fromConfig(SnsConfig snsConfig) {
            Objects.requireNonNull(snsConfig);
            return new Resources<>(SnsSender.create(SnsSenderOptions.newBuilder(snsConfig::buildClient).batchSize(snsConfig.loadInt(AloSnsSender.BATCH_SIZE_CONFIG, 1)).batchDuration(snsConfig.loadDuration(AloSnsSender.BATCH_DURATION_CONFIG, SnsSenderOptions.DEFAULT_BATCH_DURATION)).batchPrefetch(snsConfig.loadInt(AloSnsSender.BATCH_PREFETCH_CONFIG, 32)).maxRequestsInFlight(snsConfig.loadInt(AloSnsSender.MAX_REQUESTS_IN_FLIGHT_CONFIG, 1)).build()), (BodySerializer) snsConfig.loadConfiguredOrThrow(AloSnsSender.BODY_SERIALIZER_CONFIG));
        }

        public Mono<SnsSenderResult<SnsMessage<T>>> send(SnsMessage<T> snsMessage, SnsAddress snsAddress) {
            return this.sender.send((SnsSenderMessage) toSenderMessage(snsMessage, Function.identity()), snsAddress);
        }

        public <R> Flux<SnsSenderResult<R>> send(Publisher<R> publisher, Function<R, SnsMessage<T>> function, String str) {
            return Flux.from(publisher).map(obj -> {
                return toSenderMessage(obj, function);
            }).transform(flux -> {
                return this.sender.send((Publisher) flux, str);
            });
        }

        public <R> Flux<Alo<SnsSenderResult<R>>> sendAlos(Publisher<Alo<R>> publisher, Function<R, SnsMessage<T>> function, String str) {
            return AloFlux.toFlux(publisher).map(alo -> {
                return toSenderMessage(alo, function.compose((v0) -> {
                    return v0.get();
                }));
            }).transform(flux -> {
                return this.sender.send((Publisher) flux, str);
            }).map(this::toAloOfMessageResult);
        }

        public void close() {
            this.sender.close();
        }

        private <R> SnsSenderMessage<R> toSenderMessage(R r, Function<R, SnsMessage<T>> function) {
            SnsMessage<T> apply = function.apply(r);
            return SnsSenderMessage.newBuilder().messageDeduplicationId(apply.messageDeduplicationId().orElse(null)).messageGroupId(apply.messageGroupId().orElse(null)).messageAttributes(apply.messageAttributes()).messageStructure(apply.messageStructure().orElse(null)).subject(apply.subject().orElse(null)).body(this.bodySerializer.serialize(apply.body())).correlationMetadata(r).build();
        }

        private <R> Alo<SnsSenderResult<R>> toAloOfMessageResult(SnsSenderResult<Alo<R>> snsSenderResult) {
            Alo<R> correlationMetadata = snsSenderResult.correlationMetadata();
            return correlationMetadata.propagator().create(snsSenderResult.replaceCorrelationMetadata(correlationMetadata.get()), correlationMetadata.getAcknowledger(), correlationMetadata.getNacknowledger());
        }
    }

    private AloSnsSender(SnsConfigSource snsConfigSource) {
        this.futureResources = ((Mono) snsConfigSource.create()).map(Resources::fromConfig).cacheInvalidateWhen(resources -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
    }

    public static <T> AloSnsSender<T> from(SnsConfigSource snsConfigSource) {
        return new AloSnsSender<>(snsConfigSource);
    }

    public Function<Publisher<T>, Flux<SnsSenderResult<T>>> sendBodies(SnsMessageCreator<T> snsMessageCreator, String str) {
        return publisher -> {
            return sendBodies(publisher, snsMessageCreator, str);
        };
    }

    public Flux<SnsSenderResult<T>> sendBodies(Publisher<T> publisher, SnsMessageCreator<T> snsMessageCreator, String str) {
        return this.futureResources.flatMapMany(resources -> {
            return resources.send(publisher, snsMessageCreator, str);
        });
    }

    public Mono<SnsSenderResult<SnsMessage<T>>> sendMessage(SnsMessage<T> snsMessage, SnsAddress snsAddress) {
        return this.futureResources.flatMap(resources -> {
            return resources.send(snsMessage, snsAddress);
        });
    }

    public Flux<SnsSenderResult<SnsMessage<T>>> sendMessages(Publisher<SnsMessage<T>> publisher, String str) {
        return this.futureResources.flatMapMany(resources -> {
            return resources.send(publisher, Function.identity(), str);
        });
    }

    public Function<Publisher<Alo<T>>, AloFlux<SnsSenderResult<T>>> sendAloBodies(SnsMessageCreator<T> snsMessageCreator, String str) {
        return publisher -> {
            return sendAloBodies(publisher, snsMessageCreator, str);
        };
    }

    public AloFlux<SnsSenderResult<T>> sendAloBodies(Publisher<Alo<T>> publisher, SnsMessageCreator<T> snsMessageCreator, String str) {
        return (AloFlux) this.futureResources.flatMapMany(resources -> {
            return resources.sendAlos(publisher, snsMessageCreator, str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public Function<Publisher<Alo<SnsMessage<T>>>, AloFlux<SnsSenderResult<SnsMessage<T>>>> sendAloMessages(String str) {
        return publisher -> {
            return sendAloMessages(publisher, str);
        };
    }

    public AloFlux<SnsSenderResult<SnsMessage<T>>> sendAloMessages(Publisher<Alo<SnsMessage<T>>> publisher, String str) {
        return (AloFlux) this.futureResources.flatMapMany(resources -> {
            return resources.sendAlos(publisher, Function.identity(), str);
        }).as((v0) -> {
            return AloFlux.wrap(v0);
        });
    }

    public void close(Object obj) {
        LOGGER.info("Closing AloSnsSender due to reason={}", obj);
        close();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }
}
