/*
 * Decompiled with CFR 0.152.
 */
package org.awsutils.sqs.autoconfigure;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.vavr.Tuple;
import io.vavr.Tuple2;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.validation.ValidationException;
import jakarta.validation.constraints.NotNull;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
import org.awsutils.common.util.LimitedQueue;
import org.awsutils.common.util.Utils;
import org.awsutils.sqs.autoconfigure.SqsCommonProperties;
import org.awsutils.sqs.autoconfigure.SqsConfig;
import org.awsutils.sqs.autoconfigure.SqsListenerScheduleConfig;
import org.awsutils.sqs.autoconfigure.SqsMessageListenerListProperties;
import org.awsutils.sqs.autoconfigure.SqsMessageListenerProperties;
import org.awsutils.sqs.client.SqsMessageClient;
import org.awsutils.sqs.config.WorkerNodeCheckFunc;
import org.awsutils.sqs.handler.MessageHandlerFactory;
import org.awsutils.sqs.listener.SqsMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.CollectionUtils;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

@Configuration
@ConditionalOnBean(value={TaskScheduler.class, SqsMessageClient.class, MessageHandlerFactory.class, SqsAsyncClient.class})
public class SqsMessageListenerInitializer {
    private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageListenerInitializer.class);
    private final SqsMessageListenerListProperties sqsMessageListenerListProperties;
    private final SqsCommonProperties sqsCommonProperties;
    private final ApplicationContext applicationContext;
    private final SqsConfig.SqsPropertyFunc1<String, Integer> propertyFunc;
    private final MessageHandlerFactory messageHandlerFactory;
    private final SqsMessageClient sqsMessageClient;
    private final SqsAsyncClient sqsAsyncClient;
    private final SqsListenerScheduleConfig schedulingConfigurer;
    private final Environment environment;
    private static final Integer DEFAULT_WAIT_TIME_IN_SECONDS = 10;
    private final List<ExecutorService> executorServices = new ArrayList<ExecutorService>();
    @Autowired(required=false)
    private SqsConfig.CommonExecutorService commonExecutorService;
    @Autowired(required=false)
    private WorkerNodeCheckFunc workerNodeCheckFunc;
    private static final String SQS_MESSAGE_LISTENER_KEY = "sqsMessageListener_{0}";

    public SqsMessageListenerInitializer(SqsMessageListenerListProperties sqsMessageListenerListProperties, SqsCommonProperties sqsCommonProperties, ApplicationContext applicationContext, SqsConfig.SqsPropertyFunc1<String, Integer> propertyFunc, MessageHandlerFactory messageHandlerFactory, @Qualifier(value="sqsMessageClient") SqsMessageClient sqsMessageClient, SqsAsyncClient sqsAsyncClient, SqsListenerScheduleConfig schedulingConfigurer, Environment environment) {
        this.sqsMessageListenerListProperties = sqsMessageListenerListProperties;
        this.sqsCommonProperties = sqsCommonProperties;
        this.applicationContext = applicationContext;
        this.propertyFunc = propertyFunc;
        this.messageHandlerFactory = messageHandlerFactory;
        this.sqsMessageClient = sqsMessageClient;
        this.sqsAsyncClient = sqsAsyncClient;
        this.schedulingConfigurer = schedulingConfigurer;
        this.environment = environment;
    }

    @PostConstruct
    public void init() {
        Map<String, SqsMessageListenerProperties> listenerMap;
        if (this.sqsMessageListenerListProperties != null && !CollectionUtils.isEmpty(listenerMap = this.sqsMessageListenerListProperties.getListener())) {
            AutowireCapableBeanFactory autowireBeanFactory = this.applicationContext.getAutowireCapableBeanFactory();
            BeanDefinitionRegistry registry = (BeanDefinitionRegistry)autowireBeanFactory;
            listenerMap.keySet().forEach(listenerKey -> this.registerSqsListener(registry, (String)listenerKey, (SqsMessageListenerProperties)listenerMap.get(listenerKey)));
        }
    }

    public void registerSqsListener(BeanDefinitionRegistry registry, String listenerKey, SqsMessageListenerProperties sqsMessageListenerProperties) {
        try {
            GenericBeanDefinition definition = new GenericBeanDefinition();
            ConstructorArgumentValues constructorArgumentValues = new ConstructorArgumentValues();
            String listenerName = sqsMessageListenerProperties.getListenerName();
            String beanName = MessageFormat.format(SQS_MESSAGE_LISTENER_KEY, listenerKey);
            String rateLimiterName = sqsMessageListenerProperties.getRateLimiterName();
            String listenerEnabledProperty = sqsMessageListenerProperties.getStatusProperty();
            Integer waitTimeInSeconds = sqsMessageListenerProperties.getWaitTimeInSeconds();
            String messageHandlerRateLimiterName = sqsMessageListenerProperties.getMessageHandlerRateLimiterName();
            int index = 0;
            WorkerNodeCheckFunc finalWorkerNodeCheckFunc = this.workerNodeCheckFunc == null ? () -> StringUtils.isEmpty((CharSequence)listenerEnabledProperty) || this.isSqsListenerEnabled(listenerEnabledProperty) : () -> (StringUtils.isEmpty((CharSequence)listenerEnabledProperty) || this.isSqsListenerEnabled(listenerEnabledProperty)) && this.workerNodeCheckFunc.check();
            Function<Integer, SqsMessageListener> sqsMessageListenerFunc = c -> SqsMessageListener.builder().sqsAsyncClient(this.sqsAsyncClient).queueName(sqsMessageListenerProperties.getQueueName()).sqsMessageClient(this.sqsMessageClient).messageHandlerFactory(this.messageHandlerFactory).executorService(this.sqsCommonProperties.isUseCommonThreadPool() ? this.commonExecutorService.executorService() : this.createExecutorService(sqsMessageListenerProperties.getThreadPoolSize())).rateLimiterName(!StringUtils.isEmpty((CharSequence)rateLimiterName) ? rateLimiterName : null).maximumNumberOfMessagesKey(sqsMessageListenerProperties.getMaximumNumberOfMessagesKey()).semaphore(new Semaphore(1)).propertyReaderFunction(this.propertyFunc).workerNodeCheck(finalWorkerNodeCheckFunc).listenerName(!StringUtils.isEmpty((CharSequence)listenerName) ? String.format("%s_%d", listenerName, c) : String.format("%s_%d", listenerKey, c)).messageHandlerRateLimiter(!StringUtils.isEmpty((CharSequence)messageHandlerRateLimiterName) ? messageHandlerRateLimiterName : null).statusProperty(!StringUtils.isEmpty((CharSequence)listenerEnabledProperty) ? listenerEnabledProperty : null).waitTimeInSeconds(waitTimeInSeconds != null && waitTimeInSeconds > 0 ? waitTimeInSeconds : DEFAULT_WAIT_TIME_IN_SECONDS).queueUrl(sqsMessageListenerProperties.getQueueUrl()).build();
            SqsMessageListenerInitializer.validate(sqsMessageListenerProperties);
            definition.setBeanClassName("org.awsutils.sqs.autoconfigure.SqsMessageListenerInitializer.SqsMessageListenerWrapper");
            constructorArgumentValues.addIndexedArgumentValue(index++, this.executorServices);
            constructorArgumentValues.addIndexedArgumentValue(index++, (Object)sqsMessageListenerProperties.getNumberOfListenersProperty());
            constructorArgumentValues.addIndexedArgumentValue(index++, (Object)this.environment);
            constructorArgumentValues.addIndexedArgumentValue(index++, sqsMessageListenerFunc);
            constructorArgumentValues.addIndexedArgumentValue(index++, (Object)(!StringUtils.isEmpty((CharSequence)listenerName) ? listenerName : listenerKey));
            definition.setConstructorArgumentValues(constructorArgumentValues);
            registry.registerBeanDefinition(beanName, (BeanDefinition)definition);
            this.schedulingConfigurer.addListener((SqsMessageListener)this.applicationContext.getBean(beanName), sqsMessageListenerProperties.getMaximumNumberOfMessagesKey(), sqsMessageListenerProperties.getScheduleRunIntervalKey(), this.propertyFunc);
        }
        catch (RuntimeException e) {
            throw e;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private boolean isSqsListenerEnabled(String statusPropertyName) {
        Boolean enabled = (Boolean)this.environment.getProperty(statusPropertyName, Boolean.class);
        return enabled == null || enabled != false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ExecutorService createExecutorService(int fixedThreadPoolSize) {
        SqsMessageListenerInitializer sqsMessageListenerInitializer = this;
        synchronized (sqsMessageListenerInitializer) {
            LimitedQueue runnables = new LimitedQueue(1000);
            ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(fixedThreadPoolSize, fixedThreadPoolSize, 60L, TimeUnit.SECONDS, (BlockingQueue)runnables, (RejectedExecutionHandler)new LimitedQueue.LimitedQueueRejectedExecutionPolicy()){

                @Override
                public void shutdown() {
                    LOGGER.info("Shutting down executor service");
                    super.shutdown();
                }
            };
            runnables.setThreadPoolExecutor(threadPoolExecutor);
            this.executorServices.add(threadPoolExecutor);
            return threadPoolExecutor;
        }
    }

    @PreDestroy
    public void cleanUp() {
        this.executorServices.forEach(executorService -> {
            try {
                executorService.shutdown();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
    }

    public static void validate(Object a) throws IllegalAccessException, JsonProcessingException {
        Field[] fields = a.getClass().getDeclaredFields();
        ArrayList<String> errorList = new ArrayList<String>();
        HashMap<String, Object> errorMap = new HashMap<String, Object>();
        for (Field field : fields) {
            NotNull notNull;
            if (Modifier.isStatic(field.getModifiers()) || (notNull = field.getAnnotation(NotNull.class)) == null) continue;
            field.setAccessible(true);
            Object value = field.get(a);
            if (value != null) continue;
            errorList.add(notNull.message());
        }
        if (!CollectionUtils.isEmpty(errorList)) {
            errorMap.put("message", "Following fields have not been populated");
            errorMap.put("fields", errorList);
            SqsMessageListenerInitializer.logErrorMessageToConsole(errorList);
            throw new ValidationException(Utils.constructJson(errorMap));
        }
    }

    private static void logErrorMessageToConsole(List<String> errorList) {
        AtomicInteger counter = new AtomicInteger(0);
        System.err.println();
        System.err.println();
        System.err.println();
        System.err.println("####################### ALL REQUIRED PROPERTIES NOT POPULATED - Stopping Application #######################");
        System.err.println();
        System.err.println("Following fields not populated, Please add to configuration property/yaml file: ");
        errorList.forEach(a -> System.err.println(counter.incrementAndGet() + ": " + a));
        System.err.println();
        System.err.println("####################### ALL REQUIRED PROPERTIES NOT POPULATED - Stopping Application #######################");
        System.err.println();
        System.err.println();
        System.err.println();
    }

    private static class SqsMessageListenerWrapper
    implements SqsMessageListener {
        private static final long SEMAPHORE_TIMEOUT_IN_SECONDS = 15L;
        private final Environment environment;
        private final String numberOfListenersProperty;
        private final Function<Integer, SqsMessageListener> sqsMessageListenerFunc;
        private final List<ExecutorService> executorServices;
        private final String listenerName;
        private List<SqsMessageListener> sqsMessageListeners;
        private ExecutorService executorService;
        private Semaphore semaphore;
        private long lastCheckedTime;
        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private static final long TEN_MINUTES_IN_MILLIS = TimeUnit.MINUTES.toMillis(10L);

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public SqsMessageListenerWrapper(List<ExecutorService> executorServices, String numberOfListenersProperty, Environment environment, Function<Integer, SqsMessageListener> sqsMessageListenerFunc, String listenerName) {
            this.environment = environment;
            this.numberOfListenersProperty = numberOfListenersProperty;
            this.sqsMessageListenerFunc = sqsMessageListenerFunc;
            this.listenerName = listenerName;
            int numberOfListeners = this.getNumberOfListeners();
            this.semaphore = new Semaphore(numberOfListeners);
            this.sqsMessageListeners = new ArrayList<SqsMessageListener>();
            this.sqsMessageListeners = IntStream.range(0, numberOfListeners).boxed().map(sqsMessageListenerFunc::apply).collect(Collectors.toList());
            this.lastCheckedTime = System.currentTimeMillis();
            Class<SqsMessageListenerWrapper> clazz = SqsMessageListenerWrapper.class;
            synchronized (SqsMessageListenerWrapper.class) {
                this.executorService = Executors.newFixedThreadPool(numberOfListeners);
                this.executorServices = executorServices;
                executorServices.add(this.executorService);
                // ** MonitorExit[var7_7] (shouldn't be in output)
                return;
            }
        }

        public void receive() {
            long startTime = System.currentTimeMillis();
            if (startTime - this.lastCheckedTime >= TEN_MINUTES_IN_MILLIS) {
                this.checkForUpdates();
                this.lastCheckedTime = startTime;
            }
            Utils.executeUsingLock((Lock)this.lock.readLock(), () -> this.sqsMessageListeners.stream().map(this::submitJobToListener).forEach(this::waitForCompletion));
        }

        private void checkForUpdates() {
            int numberOfListeners = this.getNumberOfListeners();
            if (numberOfListeners != this.sqsMessageListeners.size()) {
                LOGGER.info("Number of listeners have changed for {} from {} to {}", new Object[]{this.listenerName, this.sqsMessageListeners.size(), numberOfListeners});
                Utils.executeUsingLock((Lock)this.lock.writeLock(), () -> {
                    this.sqsMessageListeners = IntStream.range(0, numberOfListeners).boxed().map(this.sqsMessageListenerFunc::apply).collect(Collectors.toList());
                    this.semaphore = new Semaphore(numberOfListeners);
                    Class<SqsMessageListenerWrapper> clazz = SqsMessageListenerWrapper.class;
                    synchronized (SqsMessageListenerWrapper.class) {
                        this.executorServices.remove(this.executorService);
                        this.executorService.shutdown();
                        this.executorService = Executors.newFixedThreadPool(this.sqsMessageListeners.size());
                        this.executorServices.add(this.executorService);
                        // ** MonitorExit[var2_2] (shouldn't be in output)
                        return;
                    }
                });
            }
        }

        private int getNumberOfListeners() {
            return !StringUtils.isEmpty((CharSequence)this.numberOfListenersProperty) ? (Integer)this.environment.getProperty(this.numberOfListenersProperty, Integer.class) : 1;
        }

        public void waitForCompletion(Tuple2<Boolean, Future<?>> future) {
            try {
                ((Future)future._2()).get();
            }
            catch (InterruptedException e) {
                Utils.handleInterruptedException((InterruptedException)e, () -> {});
            }
            catch (ExecutionException e) {
                Exception ex = (Exception)e.getCause();
                throw ex instanceof RuntimeException ? (RuntimeException)ex : new RuntimeException(ex);
            }
            finally {
                if (((Boolean)future._1()).booleanValue()) {
                    this.semaphore.release();
                }
            }
        }

        public Tuple2<Boolean, Future<?>> submitJobToListener(SqsMessageListener a) {
            try {
                boolean lockAcquired = this.semaphore.tryAcquire(15L, TimeUnit.SECONDS);
                if (lockAcquired) {
                    return Tuple.of((Object)true, this.executorService.submit(() -> ((SqsMessageListener)a).receive()));
                }
                return Tuple.of((Object)false, CompletableFuture.completedFuture(null));
            }
            catch (InterruptedException ex) {
                return (Tuple2)Utils.handleInterruptedException((InterruptedException)ex, () -> Tuple.of((Object)false, CompletableFuture.completedFuture(null)));
            }
        }
    }
}

