package io.atleon.aws.sqs;

import io.atleon.core.ReactivePhaser;
import io.atleon.core.SerialQueue;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.IntPredicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.retry.Retry;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

/* loaded from: input_file:io/atleon/aws/sqs/SqsReceiver.class */
public final class SqsReceiver {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsReceiver.class);
    private static final Retry DEFAULT_RETRY = Retry.backoff(3, Duration.ofMillis(10));
    private static final ReceiveMessageResponse EMPTY_RECEIVE_MESSAGE_RESPONSE = (ReceiveMessageResponse) ReceiveMessageResponse.builder().build();
    private final SqsReceiverOptions options;

    /* loaded from: input_file:io/atleon/aws/sqs/SqsReceiver$BatchRequestFailedException.class */
    public static final class BatchRequestFailedException extends RuntimeException {
        private BatchRequestFailedException(String str, List<BatchResultErrorEntry> list) {
            super(String.format("Batch request failed! type=%s errors=%s", str, list));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atleon/aws/sqs/SqsReceiver$Poller.class */
    public final class Poller {
        private final SqsAsyncClient client;
        private final String queueUrl;
        private final AtomicReference<FluxSink<SqsReceiverMessage>> sinkRef = new AtomicReference<>(null);
        private final ReactivePhaser executionPhaser = new ReactivePhaser(1);
        private final AtomicBoolean receptionPending = new AtomicBoolean(false);
        private final AtomicBoolean done = new AtomicBoolean(false);
        private final Set<String> inFlightReceiptHandles = Collections.newSetFromMap(new ConcurrentHashMap());
        private final Sinks.Many<String> receiptHandlesToDelete = Sinks.unsafe().many().unicast().onBackpressureError();
        private final SerialQueue<String> receiptHandlesToDeleteQueue = SerialQueue.onEmitNext(this.receiptHandlesToDelete);

        public Poller(SqsAsyncClient sqsAsyncClient, String str) {
            this.client = sqsAsyncClient;
            this.queueUrl = str;
            Flux asFlux = this.receiptHandlesToDelete.asFlux();
            ReactivePhaser reactivePhaser = this.executionPhaser;
            Objects.requireNonNull(reactivePhaser);
            Flux transform = asFlux.doOnComplete(reactivePhaser::register).transform(flux -> {
                return batch(flux, SqsReceiver.this.options.deleteBatchSize(), SqsReceiver.this.options.deleteInterval());
            });
            ReactivePhaser reactivePhaser2 = this.executionPhaser;
            Objects.requireNonNull(reactivePhaser2);
            transform.doAfterTerminate(reactivePhaser2::arriveAndDeregister).subscribe((v1) -> {
                deleteMessages(v1);
            }, this::doError);
        }

        public void start(FluxSink<SqsReceiverMessage> fluxSink) {
            if (!this.sinkRef.compareAndSet(null, fluxSink)) {
                throw new IllegalStateException("SQS Poller cannot be started more than once");
            }
            fluxSink.onCancel(() -> {
                dispose().subscribe();
            });
            fluxSink.onRequest(j -> {
                maybeScheduleMessageReception();
            });
        }

        public Mono<Boolean> dispose() {
            return Mono.fromSupplier(() -> {
                return Boolean.valueOf(this.done.compareAndSet(false, true));
            }).flatMap(bool -> {
                return bool.booleanValue() ? doDispose().thenReturn(true) : Mono.just(false);
            });
        }

        private Mono<?> doDispose() {
            Mono arriveAndAwaitAdvanceReactively = this.executionPhaser.arriveAndAwaitAdvanceReactively();
            Sinks.Many<String> many = this.receiptHandlesToDelete;
            Objects.requireNonNull(many);
            return arriveAndAwaitAdvanceReactively.then(Mono.fromRunnable(many::tryEmitComplete)).then(Mono.defer(() -> {
                return createChangeMessageVisibilities(this.inFlightReceiptHandles, Duration.ZERO, i -> {
                    return true;
                });
            })).then(this.executionPhaser.arriveAndAwaitAdvanceReactively()).timeout(SqsReceiver.this.options.closeTimeout()).doFinally(signalType -> {
                this.client.close();
            }).doOnError(th -> {
                SqsReceiver.LOGGER.error("Encountered error while disposing SQS Poller", th);
            }).onErrorResume(th2 -> {
                return Mono.empty();
            });
        }

        private void maybeScheduleMessageReception() {
            int calculateMaxNumberOfMessagesToRequest = calculateMaxNumberOfMessagesToRequest();
            if (calculateMaxNumberOfMessagesToRequest <= 0 || this.done.get() || !this.receptionPending.compareAndSet(false, true)) {
                return;
            }
            maybeExecute((v0, v1) -> {
                return v0.receiveMessage(v1);
            }, (ReceiveMessageRequest) ReceiveMessageRequest.builder().receiveRequestAttemptId(UUID.randomUUID().toString()).queueUrl(this.queueUrl).maxNumberOfMessages(Integer.valueOf(calculateMaxNumberOfMessagesToRequest)).messageAttributeNames(SqsReceiver.this.options.messageAttributesToRequest()).attributeNamesWithStrings(SqsReceiver.this.options.messageSystemAttributesToRequest()).waitTimeSeconds(Integer.valueOf(SqsReceiver.this.options.waitTimeSecondsPerReception())).visibilityTimeout(Integer.valueOf(SqsReceiver.this.options.visibilityTimeoutSeconds())).build(), i -> {
                return i == 0;
            }).defaultIfEmpty(SqsReceiver.EMPTY_RECEIVE_MESSAGE_RESPONSE).subscribe(this::handleMessagesReceived, this::handleMessagesReceivedError);
        }

        private int calculateMaxNumberOfMessagesToRequest() {
            return Math.min(SqsReceiver.this.options.maxMessagesPerReception(), Math.min((int) Math.min(2147483647L, this.sinkRef.get().requestedFromDownstream()), SqsReceiver.this.options.maxInFlightPerSubscription() - this.inFlightReceiptHandles.size()));
        }

        private void handleMessagesReceived(ReceiveMessageResponse receiveMessageResponse) {
            receiveMessageResponse.messages().forEach(this::emit);
            this.receptionPending.set(false);
            maybeScheduleMessageReception();
        }

        private void handleMessagesReceivedError(Throwable th) {
            doError(th);
            this.receptionPending.set(false);
        }

        private void emit(Message message) {
            String receiptHandle = message.receiptHandle();
            Runnable runnable = () -> {
                if (this.executionPhaser.register() == 0 && !this.done.get() && this.inFlightReceiptHandles.remove(receiptHandle)) {
                    this.receiptHandlesToDeleteQueue.addAndDrain(receiptHandle);
                    maybeScheduleMessageReception();
                }
                this.executionPhaser.arriveAndDeregister();
            };
            SqsMessageVisibilityChanger sqsMessageVisibilityChanger = (duration, z) -> {
                if (this.executionPhaser.register() == 0 && !this.done.get()) {
                    if (z && this.inFlightReceiptHandles.contains(receiptHandle)) {
                        maybeChangeMessageVisibility(receiptHandle, duration);
                    } else if (!z && this.inFlightReceiptHandles.remove(receiptHandle)) {
                        maybeChangeMessageVisibility(receiptHandle, duration);
                        maybeScheduleMessageReception();
                    }
                }
                this.executionPhaser.arriveAndDeregister();
            };
            this.inFlightReceiptHandles.add(receiptHandle);
            doNext(SqsReceiverMessage.create(message, runnable, sqsMessageVisibilityChanger));
        }

        private void deleteMessages(Collection<String> collection) {
            if (collection.isEmpty()) {
                return;
            }
            maybeExecute((v0, v1) -> {
                return v0.deleteMessageBatch(v1);
            }, (DeleteMessageBatchRequest) DeleteMessageBatchRequest.builder().queueUrl(this.queueUrl).entries((List) collection.stream().map(str -> {
                return (DeleteMessageBatchRequestEntry) DeleteMessageBatchRequestEntry.builder().id(newReceiptHandleId()).receiptHandle(str).build();
            }).collect(Collectors.toList())).build(), i -> {
                return true;
            }).subscribe(this::handleMessagesDeleted, this::doError);
        }

        private void handleMessagesDeleted(DeleteMessageBatchResponse deleteMessageBatchResponse) {
            if (deleteMessageBatchResponse.hasFailed()) {
                doError(new BatchRequestFailedException("DeleteMessage", deleteMessageBatchResponse.failed()));
            }
        }

        private void maybeChangeMessageVisibility(String str, Duration duration) {
            createChangeMessageVisibilities(Collections.singletonList(str), duration, i -> {
                return i == 0;
            }).subscribe(this::handleMessageVisibilitiesChanged, this::doError);
        }

        private Mono<ChangeMessageVisibilityBatchResponse> createChangeMessageVisibilities(Collection<String> collection, Duration duration, IntPredicate intPredicate) {
            if (collection.isEmpty()) {
                return Mono.empty();
            }
            int intExact = Math.toIntExact(duration.getSeconds());
            return maybeExecute((v0, v1) -> {
                return v0.changeMessageVisibilityBatch(v1);
            }, (ChangeMessageVisibilityBatchRequest) ChangeMessageVisibilityBatchRequest.builder().queueUrl(this.queueUrl).entries((List) collection.stream().map(str -> {
                return createChangeMessageVisibilityRequestEntry(str, intExact);
            }).collect(Collectors.toList())).build(), intPredicate);
        }

        private ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityRequestEntry(String str, int i) {
            return (ChangeMessageVisibilityBatchRequestEntry) ChangeMessageVisibilityBatchRequestEntry.builder().id(newReceiptHandleId()).receiptHandle(str).visibilityTimeout(Integer.valueOf(i)).build();
        }

        private void handleMessageVisibilitiesChanged(ChangeMessageVisibilityBatchResponse changeMessageVisibilityBatchResponse) {
            if (changeMessageVisibilityBatchResponse.hasFailed()) {
                doError(new BatchRequestFailedException("ChangeMessageVisibility", changeMessageVisibilityBatchResponse.failed()));
            }
        }

        private <T, V> Mono<V> maybeExecute(BiFunction<SqsAsyncClient, T, CompletableFuture<V>> biFunction, T t, IntPredicate intPredicate) {
            return Mono.fromSupplier(() -> {
                return Boolean.valueOf(intPredicate.test(this.executionPhaser.register()));
            }).cache().flatMap(bool -> {
                return bool.booleanValue() ? Mono.fromFuture((CompletableFuture) biFunction.apply(this.client, t)) : Mono.empty();
            }).retryWhen(SqsReceiver.DEFAULT_RETRY).doFinally(signalType -> {
                this.executionPhaser.arriveAndDeregister();
            });
        }

        private void doNext(SqsReceiverMessage sqsReceiverMessage) {
            try {
                this.sinkRef.get().next(sqsReceiverMessage);
            } catch (Throwable th) {
                doError(th);
            }
        }

        private void doError(Throwable th) {
            dispose().subscribe(bool -> {
                if (bool.booleanValue()) {
                    this.sinkRef.get().error(th);
                }
            });
        }

        private <T> Flux<List<T>> batch(Flux<T> flux, int i, Duration duration) {
            return i <= 1 ? flux.map(Collections::singletonList) : flux.bufferTimeout(i, duration);
        }

        private String newReceiptHandleId() {
            return UUID.randomUUID().toString();
        }
    }

    private SqsReceiver(SqsReceiverOptions sqsReceiverOptions) {
        this.options = sqsReceiverOptions;
    }

    public static SqsReceiver create(SqsReceiverOptions sqsReceiverOptions) {
        return new SqsReceiver(sqsReceiverOptions);
    }

    public Flux<SqsReceiverMessage> receiveManual(String str) {
        return Flux.usingWhen(Mono.fromSupplier(() -> {
            return new Poller(this.options.createClient(), str);
        }), poller -> {
            Objects.requireNonNull(poller);
            return Flux.create(poller::start, FluxSink.OverflowStrategy.ERROR);
        }, (v0) -> {
            return v0.dispose();
        });
    }
}
