/*
 * Decompiled with CFR 0.152.
 */
package org.awsutils.sqs.listener;

import io.vavr.Tuple;
import io.vavr.Tuple3;
import java.lang.reflect.Proxy;
import java.math.BigInteger;
import java.text.MessageFormat;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.awsutils.common.exceptions.UtilsException;
import org.awsutils.common.ratelimiter.RateLimiter;
import org.awsutils.common.ratelimiter.RateLimiterFactory;
import org.awsutils.common.util.ApplicationContextUtils;
import org.awsutils.common.util.Utils;
import org.awsutils.sqs.client.SqsMessageClient;
import org.awsutils.sqs.config.WorkerNodeCheckFunc;
import org.awsutils.sqs.handler.MessageHandlerFactory;
import org.awsutils.sqs.handler.SqsMessageHandler;
import org.awsutils.sqs.listener.SqsMessageListener;
import org.awsutils.sqs.message.MessageAttribute;
import org.awsutils.sqs.message.SnsSubscriptionMessage;
import org.awsutils.sqs.message.SqsMessage;
import org.awsutils.sqs.message.TaskInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.env.Environment;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

final class SqsMessageListenerImpl
implements SqsMessageListener {
    private static final Logger log = LoggerFactory.getLogger(SqsMessageListenerImpl.class);
    private final String queueName;
    private final Environment environment;
    private String queueUrl;
    private final SqsAsyncClient sqsAsyncClient;
    private final MessageHandlerFactory messageHandlerFactory;
    private final SqsMessageClient sqsMessageClient;
    private final String rateLimiterName;
    private final ExecutorService executorService;
    private final String maximumNumberOfMessagesKey;
    private final Function<String, Integer> propertyReaderFunction;
    private final WorkerNodeCheckFunc workerNodeCheck;
    private final Semaphore semaphore;
    private final String listenerName;
    private final String messageHandlerRateLimiterName;
    private RateLimiter rateLimiter;
    private RateLimiter messageHandlerRateLimiter;
    private final Function<String, String> queueUrlFunc;
    private final boolean listenerEnabled;
    private final Integer waitTimeInSeconds;
    private static final Integer MAX_NUMBER_OF_SQS_MESSAGES = 10;
    private static final int MAXIMUM_NUMBER_OF_MESSAGES = 2000;
    private static final long SEMAPHORE_TIMEOUT_IN_SECONDS = 15L;
    private static final long CHANGE_VISIBILITY_PERIOD_IN_SECONDS = TimeUnit.MINUTES.toSeconds(15L);
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageListenerImpl.class);
    private static final Thread SHUTDOWN_HOOK = new Thread();

    private SqsMessageListenerImpl(SqsAsyncClient sqsAsyncClient, String queueName, String queueUrl, SqsMessageClient sqsMessageClient, MessageHandlerFactory messageHandlerFactory, ExecutorService executorService, String rateLimiterName, String maximumNumberOfMessagesKey, Semaphore semaphore, Function<String, Integer> propertyReaderFunction, WorkerNodeCheckFunc workerNodeCheck, String listenerName, String messageHandlerRateLimiterName, String statusProperty, Integer waitTimeInSeconds) {
        this.rateLimiterName = rateLimiterName;
        this.messageHandlerRateLimiterName = messageHandlerRateLimiterName;
        this.waitTimeInSeconds = waitTimeInSeconds;
        this.sqsAsyncClient = sqsAsyncClient;
        this.messageHandlerFactory = messageHandlerFactory;
        this.propertyReaderFunction = propertyReaderFunction;
        this.listenerName = listenerName;
        this.sqsMessageClient = sqsMessageClient;
        this.executorService = executorService;
        this.maximumNumberOfMessagesKey = maximumNumberOfMessagesKey;
        this.workerNodeCheck = workerNodeCheck == null ? () -> true : workerNodeCheck;
        this.semaphore = semaphore == null ? new Semaphore(BigInteger.ONE.intValue()) : semaphore;
        this.environment = (Environment)ApplicationContextUtils.getInstance().getBean(Environment.class);
        this.listenerEnabled = StringUtils.hasLength((String)statusProperty) ? (Boolean)this.environment.getProperty(statusProperty, Boolean.class, (Object)true) : true;
        this.queueName = queueName;
        this.queueUrl = queueUrl;
        Function<String, String> function = this.queueUrlFunc = StringUtils.hasLength((String)queueUrl) ? qName -> this.queueUrl : qName -> this.getQueueUrl(sqsMessageClient, (String)qName);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Creating SqsMessageListener: {}, Queue: {}", (Object)listenerName, (Object)queueName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String getQueueUrl(SqsMessageClient sqsMessageClient, String queueName) {
        if (!StringUtils.hasLength((String)this.queueUrl)) {
            SqsMessageListenerImpl sqsMessageListenerImpl = this;
            synchronized (sqsMessageListenerImpl) {
                if (!StringUtils.hasLength((String)this.queueUrl)) {
                    this.queueUrl = sqsMessageClient.getQueueUrl(queueName);
                }
            }
        }
        return this.queueUrl;
    }

    @Override
    public void receive() {
        if (this.listenerEnabled && this.workerNodeCheck.check()) {
            this.setRateLimiters();
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Receiving messages after starter in listener [{0}]", this.listenerName));
            }
            try {
                this.processUsingLock();
            }
            catch (InterruptedException e) {
                Utils.handleInterruptedException((InterruptedException)e, () -> {});
            }
        } else if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Not receiving messages since worker node check returned false");
        }
    }

    private void setRateLimiters() {
        try {
            if (this.rateLimiter == null && StringUtils.hasLength((String)this.rateLimiterName)) {
                this.rateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.rateLimiterName);
            }
            if (this.messageHandlerRateLimiter == null && StringUtils.hasLength((String)this.messageHandlerRateLimiterName)) {
                this.messageHandlerRateLimiter = RateLimiterFactory.getInstance().getRateLimiter(this.messageHandlerRateLimiterName);
            }
        }
        catch (Exception ex) {
            log.error("Exception: {}", (Object)ex, (Object)ex);
            throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
        }
    }

    private void processUsingLock() throws InterruptedException {
        Utils.executeUsingSemaphore((Semaphore)this.semaphore, (long)15L, (TimeUnit)TimeUnit.SECONDS, () -> {
            int messageCounter = 0;
            boolean proceed = true;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Checking for messages from SQS in listener [{0}]: {1}", this.listenerName, this.queueUrlFunc.apply(this.queueName)));
            }
            while (proceed) {
                List<Message> messages = this.receiveMessages();
                proceed = this.processSqsMessages(messages, messageCounter += messages.size());
                if (!LOGGER.isDebugEnabled()) continue;
                LOGGER.debug(MessageFormat.format("Proceed with receiving messages [{0}]: {1}", this.listenerName, proceed));
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Total number of messages received: {0}", messageCounter));
                LOGGER.debug(MessageFormat.format("Rate limiter used: {0}", this.rateLimiter != null ? this.rateLimiter.getRateLimiterName() : null));
            }
        });
    }

    private boolean processSqsMessages(List<Message> messages, int messageCounter) {
        boolean proceed;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("In processSqsMessages: " + messages);
        }
        if (!CollectionUtils.isEmpty(messages)) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Message list is not empty..");
            }
            messages.forEach(this::processSqsMessage);
            proceed = (!StringUtils.hasLength((String)this.maximumNumberOfMessagesKey) ? 2000 : this.propertyReaderFunction.apply(this.maximumNumberOfMessagesKey)) > messageCounter;
        } else {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("List of messages is empty in listener [{0}]: {1}", this.listenerName, messages != null ? messages.size() : 0));
            }
            proceed = false;
        }
        return proceed;
    }

    private void processSqsMessage(Message message) {
        long startTime = System.currentTimeMillis();
        ChangeMessageVisibilityResponse changeVisibilityResp = SqsMessageListenerImpl.getResultFromFuture(this.sqsMessageClient.changeVisibility(this.queueUrlFunc.apply(this.queueName), message.receiptHandle(), (int)CHANGE_VISIBILITY_PERIOD_IN_SECONDS));
        Runnable action0 = () -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Processing message: " + message.messageId());
            }
            this.executorService.submit(() -> this.processMessage(message, startTime));
        };
        if (this.rateLimiter == null) {
            action0.run();
        } else {
            this.rateLimiter.execute(action0::run);
        }
    }

    private static <T> T getResultFromFuture(Future<T> future) {
        try {
            return future.get();
        }
        catch (InterruptedException e) {
            return (T)Utils.handleInterruptedException((InterruptedException)e, () -> null);
        }
        catch (ExecutionException e) {
            throw new UtilsException("UNKNOWN_ERROR", (Throwable)e);
        }
    }

    private void processMessage(Message message, long startTime) {
        try {
            long timeTakenToStartProcessing;
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(MessageFormat.format("Processing message in listener[{0}]: {1}", this.listenerName, message));
            }
            if ((timeTakenToStartProcessing = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - startTime)) < CHANGE_VISIBILITY_PERIOD_IN_SECONDS) {
                SqsMessageHandler messageHandler;
                String body = message.body();
                String receiptHandle = message.receiptHandle();
                String receiveCount = (String)message.attributes().get(MessageSystemAttributeName.APPROXIMATE_RECEIVE_COUNT);
                Map messageAttributes = message.messageAttributes().entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, a -> ((MessageAttributeValue)a.getValue()).stringValue()));
                String sqsMessageWrapperPresent = messageAttributes.get("sqsMessageWrapperPresent");
                if (StringUtils.hasLength((String)sqsMessageWrapperPresent) && "true".equalsIgnoreCase(sqsMessageWrapperPresent) || message.body().contains("\"messageType")) {
                    Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> sqsMessage = this.constructSqsMessage(body, receiptHandle);
                    messageHandler = this.messageHandlerFactory.getMessageHandler((SqsMessage)sqsMessage._1(), receiptHandle, this.queueUrlFunc.apply(this.queueName), StringUtils.hasLength((String)receiveCount) ? Integer.parseInt(receiveCount) : 0, CollectionUtils.isEmpty(messageAttributes) ? (Map)sqsMessage._2() : messageAttributes, this.messageHandlerRateLimiter);
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(MessageFormat.format("Handling message by {0}", messageHandler));
                    }
                } else if (StringUtils.hasLength((String)messageAttributes.get("messageType"))) {
                    String messageType = messageAttributes.get("messageType");
                    String transactionId = messageAttributes.get("transactionId");
                    messageHandler = this.messageHandlerFactory.getMessageHandler(body, messageType, transactionId, receiptHandle, this.queueUrl, StringUtils.hasLength((String)receiveCount) ? Integer.parseInt(receiveCount) : 0, messageAttributes, this.messageHandlerRateLimiter);
                } else {
                    throw new UtilsException("INVALID_MESSAGE", "The message body should be of SqsMessage type or should contain `messageType` attribute");
                }
                messageHandler.handle();
            }
        }
        catch (UtilsException e) {
            this.handleUtilsException(message, e);
        }
        catch (Exception e) {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, e.getMessage()), (Throwable)e);
        }
    }

    private void handleUtilsException(Message message, UtilsException e) {
        if ("NO_HANDLER_FOR_MESSAGE_TYPE".equalsIgnoreCase(e.getErrorType()) || "INVALID_JSON".equalsIgnoreCase(e.getErrorType())) {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, e.getMessage()), (Throwable)e);
            this.sqsMessageClient.deleteMessage(this.queueUrlFunc.apply(this.queueName), message.receiptHandle());
        } else {
            LOGGER.error(MessageFormat.format("Exception in listener[{0}]: {1}", this.listenerName, e.getMessage()), (Throwable)e);
        }
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> constructSqsMessage(String body, String receiptHandle) {
        if (!body.contains("\"Type\" : \"Notification\"")) {
            SqsMessage sqsMessage = (SqsMessage)Utils.constructFromJson(SqsMessage.class, (String)body, cause -> new UtilsException("INVALID_JSON", cause));
            return Tuple.of((Object)sqsMessage, null, null);
        }
        return this.processSnsNotification(body, receiptHandle);
    }

    private Tuple3<SqsMessage<?>, Map<String, String>, TaskInput<?>> processSnsNotification(String body, String receiptHandle) {
        SqsMessage sqsMessage;
        TaskInput taskInput;
        SnsSubscriptionMessage snsSubscriptionMessage = (SnsSubscriptionMessage)Utils.constructFromJson(SnsSubscriptionMessage.class, (String)body);
        String snsMessage = snsSubscriptionMessage.getMessage();
        Map<String, MessageAttribute> messageAttributes = snsSubscriptionMessage.getMessageAttributes();
        if (snsMessage.contains("\"Input\"")) {
            taskInput = (TaskInput)Utils.constructFromJson(TaskInput.class, (String)snsMessage);
            if (taskInput.getInput() == null) {
                this.sqsMessageClient.deleteMessage(this.queueName, receiptHandle);
                throw new UtilsException("EMPTY_MESSAGE_BODY", "Empty sqs message body");
            }
            sqsMessage = taskInput.getInput();
        } else {
            taskInput = null;
            sqsMessage = (SqsMessage)Utils.constructFromJson(SqsMessage.class, (String)snsMessage, cause -> new UtilsException("INVALID_JSON", cause));
        }
        Map<String, String> messAttr = CollectionUtils.isEmpty(messageAttributes) ? null : messageAttributes.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, b -> ((MessageAttribute)b.getValue()).getValue()));
        return Tuple.of(sqsMessage, messAttr, (Object)taskInput);
    }

    private List<Message> receiveMessages() {
        try {
            ReceiveMessageRequest request = (ReceiveMessageRequest)ReceiveMessageRequest.builder().queueUrl(this.queueUrlFunc.apply(this.queueName)).attributeNames(new QueueAttributeName[]{QueueAttributeName.ALL}).messageAttributeNames(new String[]{"All"}).maxNumberOfMessages(MAX_NUMBER_OF_SQS_MESSAGES).waitTimeSeconds(this.waitTimeInSeconds).build();
            return ((ReceiveMessageResponse)this.sqsAsyncClient.receiveMessage(request).get()).messages();
        }
        catch (InterruptedException e) {
            return (List)Utils.handleInterruptedException((InterruptedException)e, Collections::emptyList);
        }
        catch (ExecutionException e) {
            LOGGER.error(MessageFormat.format("Exception in receiveMessages in listener[{0}]: {1}", this.listenerName, e), (Throwable)e);
            throw new UtilsException("UNKNOWN_ERROR", (Throwable)e);
        }
    }

    static SqsMessageListener.Builder builder() {
        return new SqsMessageListenerBuilder();
    }

    private static class SqsMessageListenerBuilder
    implements SqsMessageListener.Builder {
        private String queueName;
        private SqsAsyncClient sqsAsyncClient;
        private MessageHandlerFactory messageHandlerFactory;
        private SqsMessageClient sqsMessageClient;
        private ExecutorService executorService;
        private String maximumNumberOfMessagesKey;
        private Function<String, Integer> propertyReaderFunction;
        private WorkerNodeCheckFunc workerNodeCheck;
        private Semaphore semaphore;
        private String listenerName;
        private String rateLimiterName;
        private String messageHandlerRateLimiter;
        private String statusProperty;
        private Integer waitTimeInSeconds;
        private String queueUrl;

        private SqsMessageListenerBuilder() {
        }

        @Override
        public SqsMessageListener.Builder queueName(String queueName) {
            this.queueName = queueName;
            return this;
        }

        @Override
        public SqsMessageListener.Builder queueUrl(String queueUrl) {
            this.queueUrl = queueUrl;
            return this;
        }

        @Override
        public SqsMessageListener.Builder sqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
            this.sqsAsyncClient = sqsAsyncClient;
            return this;
        }

        @Override
        public SqsMessageListener.Builder messageHandlerFactory(MessageHandlerFactory messageHandlerFactory) {
            this.messageHandlerFactory = messageHandlerFactory;
            return this;
        }

        @Override
        public SqsMessageListener.Builder sqsMessageClient(SqsMessageClient sqsMessageClient) {
            this.sqsMessageClient = sqsMessageClient;
            return this;
        }

        @Override
        public SqsMessageListener.Builder executorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        @Override
        public SqsMessageListener.Builder maximumNumberOfMessagesKey(String maximumNumberOfMessagesKey) {
            this.maximumNumberOfMessagesKey = maximumNumberOfMessagesKey;
            return this;
        }

        @Override
        public SqsMessageListener.Builder propertyReaderFunction(Function<String, Integer> propertyReaderFunction) {
            this.propertyReaderFunction = propertyReaderFunction;
            return this;
        }

        @Override
        public SqsMessageListener.Builder workerNodeCheck(WorkerNodeCheckFunc workerNodeCheck) {
            this.workerNodeCheck = workerNodeCheck;
            return this;
        }

        @Override
        public SqsMessageListener.Builder semaphore(Semaphore semaphore) {
            this.semaphore = semaphore;
            return this;
        }

        @Override
        public SqsMessageListener.Builder listenerName(String listenerName) {
            this.listenerName = listenerName;
            return this;
        }

        @Override
        public SqsMessageListener.Builder rateLimiterName(String rateLimiterName) {
            this.rateLimiterName = rateLimiterName;
            return this;
        }

        @Override
        public SqsMessageListener.Builder messageHandlerRateLimiter(String messageHandlerRateLimiter) {
            this.messageHandlerRateLimiter = messageHandlerRateLimiter;
            return this;
        }

        @Override
        public SqsMessageListener.Builder statusProperty(String enabled) {
            this.statusProperty = enabled;
            return this;
        }

        @Override
        public SqsMessageListener.Builder waitTimeInSeconds(Integer waitTimeInSeconds) {
            this.waitTimeInSeconds = waitTimeInSeconds;
            return this;
        }

        @Override
        public SqsMessageListener build() {
            SqsMessageListenerImpl sqsMessageListener = new SqsMessageListenerImpl(this.sqsAsyncClient, this.queueName, this.queueUrl, this.sqsMessageClient, this.messageHandlerFactory, this.executorService, this.rateLimiterName, this.maximumNumberOfMessagesKey, this.semaphore, this.propertyReaderFunction, this.workerNodeCheck, !StringUtils.hasLength((String)this.listenerName) ? UUID.randomUUID().toString() : this.listenerName.trim(), this.messageHandlerRateLimiter, this.statusProperty, this.waitTimeInSeconds);
            return (SqsMessageListener)Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[]{SqsMessageListener.class}, (proxy, method, args) -> method.invoke((Object)sqsMessageListener, args));
        }
    }
}

