package io.atleon.aws.sns;

import io.atleon.core.Batcher;
import java.io.Closeable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import software.amazon.awssdk.services.sns.SnsAsyncClient;
import software.amazon.awssdk.services.sns.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sns.model.PublishBatchRequest;
import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry;
import software.amazon.awssdk.services.sns.model.PublishBatchResponse;
import software.amazon.awssdk.services.sns.model.PublishBatchResultEntry;
import software.amazon.awssdk.services.sns.model.PublishRequest;
import software.amazon.awssdk.services.sns.model.PublishResponse;

/* loaded from: input_file:io/atleon/aws/sns/SnsSender.class */
public final class SnsSender implements Closeable {
    private static final Retry DEFAULT_RETRY = Retry.backoff(3, Duration.ofMillis(10));
    private final Mono<SnsAsyncClient> futureClient;
    private final Batcher batcher;
    private final int maxRequestsInFlight;
    private final Sinks.Many<Long> closeSink = Sinks.many().multicast().directBestEffort();

    /* loaded from: input_file:io/atleon/aws/sns/SnsSender$MessagePublishFailedException.class */
    public static final class MessagePublishFailedException extends RuntimeException {
        private final String code;
        private final String message;

        public MessagePublishFailedException(String str, String str2) {
            super(String.format("Publishing message failed with code=%s: %s", str, str2));
            this.code = str;
            this.message = str2;
        }

        public String code() {
            return this.code;
        }

        public String message() {
            return this.message;
        }
    }

    private SnsSender(SnsSenderOptions snsSenderOptions) {
        Objects.requireNonNull(snsSenderOptions);
        this.futureClient = Mono.fromSupplier(snsSenderOptions::createClient).cacheInvalidateWhen(snsAsyncClient -> {
            return this.closeSink.asFlux().next().then();
        }, (v0) -> {
            v0.close();
        });
        this.batcher = Batcher.create(snsSenderOptions.batchSize(), snsSenderOptions.batchDuration(), snsSenderOptions.batchPrefetch());
        this.maxRequestsInFlight = snsSenderOptions.maxRequestsInFlight();
    }

    public static SnsSender create(SnsSenderOptions snsSenderOptions) {
        return new SnsSender(snsSenderOptions);
    }

    public <C> Mono<SnsSenderResult<C>> send(SnsSenderMessage<C> snsSenderMessage, SnsAddress snsAddress) {
        return this.futureClient.flatMap(snsAsyncClient -> {
            return send(snsAsyncClient, snsSenderMessage, snsAddress);
        });
    }

    public <C> Flux<SnsSenderResult<C>> send(Publisher<SnsSenderMessage<C>> publisher, String str) {
        return this.futureClient.flatMapMany(snsAsyncClient -> {
            return this.batcher.applyMapping(publisher, list -> {
                return send(snsAsyncClient, list, str);
            }, this.maxRequestsInFlight);
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.closeSink.tryEmitNext(Long.valueOf(System.currentTimeMillis()));
    }

    private <C> Mono<SnsSenderResult<C>> send(SnsAsyncClient snsAsyncClient, SnsSenderMessage<C> snsSenderMessage, SnsAddress snsAddress) {
        PublishRequest createPublishRequest = createPublishRequest(snsSenderMessage, snsAddress);
        String requestId = snsSenderMessage.requestId();
        C correlationMetadata = snsSenderMessage.correlationMetadata();
        return Mono.fromFuture(() -> {
            return snsAsyncClient.publish(createPublishRequest);
        }).retryWhen(DEFAULT_RETRY).map(publishResponse -> {
            return createSuccessResult(requestId, publishResponse, correlationMetadata);
        }).onErrorResume(th -> {
            return Mono.just(createFailureResult(requestId, th, correlationMetadata));
        });
    }

    private <C> Flux<SnsSenderResult<C>> send(SnsAsyncClient snsAsyncClient, List<SnsSenderMessage<C>> list, String str) {
        PublishBatchRequest publishBatchRequest = (PublishBatchRequest) PublishBatchRequest.builder().topicArn(str).publishBatchRequestEntries((Collection) list.stream().map(this::createBatchRequestEntry).collect(Collectors.toList())).build();
        Map map = (Map) list.stream().filter(snsSenderMessage -> {
            return snsSenderMessage.correlationMetadata() != null;
        }).collect(Collectors.toMap((v0) -> {
            return v0.requestId();
        }, (v0) -> {
            return v0.correlationMetadata();
        }));
        return Mono.fromFuture(() -> {
            return snsAsyncClient.publishBatch(publishBatchRequest);
        }).retryWhen(DEFAULT_RETRY).flatMapIterable(publishBatchResponse -> {
            return createResults(publishBatchResponse, map);
        });
    }

    private <C> PublishRequest createPublishRequest(SnsSenderMessage<C> snsSenderMessage, SnsAddress snsAddress) {
        return (PublishRequest) newPublishRequestBuilder(snsAddress).messageDeduplicationId(snsSenderMessage.messageDeduplicationId().orElse(null)).messageGroupId(snsSenderMessage.messageGroupId().orElse(null)).messageAttributes(snsSenderMessage.messageAttributes()).messageStructure(snsSenderMessage.messageStructure().orElse(null)).subject(snsSenderMessage.subject().orElse(null)).message(snsSenderMessage.body()).build();
    }

    private PublishRequest.Builder newPublishRequestBuilder(SnsAddress snsAddress) {
        switch (snsAddress.type()) {
            case TOPIC_ARN:
                return PublishRequest.builder().topicArn(snsAddress.value());
            case TARGET_ARN:
                return PublishRequest.builder().targetArn(snsAddress.value());
            case PHONE_NUMBER:
                return PublishRequest.builder().phoneNumber(snsAddress.value());
            default:
                throw new UnsupportedOperationException("Publishing not supported for addressType=" + snsAddress.type());
        }
    }

    private <C> PublishBatchRequestEntry createBatchRequestEntry(SnsSenderMessage<C> snsSenderMessage) {
        return (PublishBatchRequestEntry) PublishBatchRequestEntry.builder().id(snsSenderMessage.requestId()).messageDeduplicationId(snsSenderMessage.messageDeduplicationId().orElse(null)).messageGroupId(snsSenderMessage.messageGroupId().orElse(null)).messageAttributes(snsSenderMessage.messageAttributes()).messageStructure(snsSenderMessage.messageStructure().orElse(null)).subject(snsSenderMessage.subject().orElse(null)).message(snsSenderMessage.body()).build();
    }

    private <C> List<SnsSenderResult<C>> createResults(PublishBatchResponse publishBatchResponse, Map<String, C> map) {
        return (List) Stream.concat(publishBatchResponse.failed().stream().map(batchResultErrorEntry -> {
            return createFailureResult(batchResultErrorEntry, map.get(batchResultErrorEntry.id()));
        }), publishBatchResponse.successful().stream().map(publishBatchResultEntry -> {
            return createSuccessResult(publishBatchResultEntry, map.get(publishBatchResultEntry.id()));
        })).collect(Collectors.toList());
    }

    private <C> SnsSenderResult<C> createFailureResult(String str, Throwable th, C c) {
        return SnsSenderResult.failure(str, th, c);
    }

    private <C> SnsSenderResult<C> createFailureResult(BatchResultErrorEntry batchResultErrorEntry, C c) {
        return SnsSenderResult.failure(batchResultErrorEntry.id(), new MessagePublishFailedException(batchResultErrorEntry.code(), batchResultErrorEntry.message()), c);
    }

    private <C> SnsSenderResult<C> createSuccessResult(String str, PublishResponse publishResponse, C c) {
        return SnsSenderResult.success(str, publishResponse.messageId(), publishResponse.sequenceNumber(), c);
    }

    private <C> SnsSenderResult<C> createSuccessResult(PublishBatchResultEntry publishBatchResultEntry, C c) {
        return SnsSenderResult.success(publishBatchResultEntry.id(), publishBatchResultEntry.messageId(), publishBatchResultEntry.sequenceNumber(), c);
    }
}
