/*
 * Decompiled with CFR 0.152.
 */
package io.polyglotted.aws.message;

import com.amazonaws.AbortedException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import io.polyglotted.aws.common.AwsClientFactory;
import io.polyglotted.aws.config.AwsConfig;
import io.polyglotted.aws.message.QueueConfig;
import io.polyglotted.aws.message.SubscriberUtil;
import io.polyglotted.common.util.CollUtil;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SqsSubscriber
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SqsSubscriber.class);
    private final ExecutorService receiverPool = Executors.newSingleThreadExecutor();
    private final ExecutorService handlerPool = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private final AmazonSQS sqsClient;
    private final QueueConfig subscriber;
    private final Object messageHandler;

    public SqsSubscriber(AwsConfig config, QueueConfig subscriber, Object messageHandler) {
        this(AwsClientFactory.createSqsClient(config), subscriber, messageHandler);
        this.receiverPool.execute(this::receiveMessage);
    }

    @Override
    public void close() {
        SubscriberUtil.stopThreadPool(this.receiverPool, this.subscriber.getShutdownTimeInMillis(), TimeUnit.MILLISECONDS);
        SubscriberUtil.stopThreadPool(this.handlerPool, this.subscriber.getShutdownTimeInMillis(), TimeUnit.MILLISECONDS);
    }

    private void receiveMessage() {
        if (!this.subscriber.isEnabled()) {
            return;
        }
        String queueUrl = this.sqsClient.getQueueUrl(this.subscriber.getQueue()).getQueueUrl();
        while (!Thread.interrupted()) {
            try {
                ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(Integer.valueOf(this.subscriber.getWaitTimeInSeconds())).withMaxNumberOfMessages(Integer.valueOf(this.subscriber.getMaxMessages()));
                List messages = this.sqsClient.receiveMessage(receiveMessageRequest).getMessages();
                if (messages.size() <= 0) continue;
                this.handleMessages(queueUrl, messages);
            }
            catch (AbortedException ignored) {
                break;
            }
            catch (Exception ex) {
                log.error("error receiving message from sqs", (Throwable)ex);
            }
        }
    }

    private void handleMessages(String queueUrl, List<Message> messages) {
        for (Message message : messages) {
            this.handlerPool.submit(() -> {
                try {
                    SubscriberUtil.safeInvoke(this.messageHandler, "handleMessage", new Class[]{String.class}, message.getBody());
                    this.sqsClient.deleteMessage(new DeleteMessageRequest(queueUrl, message.getReceiptHandle()));
                }
                catch (Exception ex) {
                    log.error("error processing messages: " + CollUtil.transformList((List)messages, Message::getMessageId), (Throwable)ex);
                }
            });
        }
    }

    public SqsSubscriber(AmazonSQS sqsClient, QueueConfig subscriber, Object messageHandler) {
        this.sqsClient = sqsClient;
        this.subscriber = subscriber;
        this.messageHandler = messageHandler;
    }
}

