package org.springframework.integration.endpoint;

import java.time.Duration;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.aopalliance.aop.Advice;
import org.eclipse.jgit.lib.ConfigConstants;
import org.reactivestreams.Subscription;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.beans.factory.BeanClassLoaderAware;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.integration.aop.ReceiveMessageAdvice;
import org.springframework.integration.channel.ChannelUtils;
import org.springframework.integration.channel.MessagePublishingErrorHandler;
import org.springframework.integration.support.MessagingExceptionWrapper;
import org.springframework.integration.transaction.IntegrationResourceHolder;
import org.springframework.integration.transaction.IntegrationResourceHolderSynchronization;
import org.springframework.integration.transaction.PassThroughTransactionSynchronizationFactory;
import org.springframework.integration.transaction.TransactionSynchronizationFactory;
import org.springframework.integration.util.ErrorHandlingTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.PeriodicTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;
import org.springframework.transaction.interceptor.TransactionInterceptor;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ErrorHandler;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.5.18.jar:org/springframework/integration/endpoint/AbstractPollingEndpoint.class */
public abstract class AbstractPollingEndpoint extends AbstractEndpoint implements BeanClassLoaderAware {
    public static final long DEFAULT_POLLING_PERIOD = 10;
    private ErrorHandler errorHandler;
    private boolean errorHandlerIsDefault;
    private List<Advice> adviceChain;
    private TransactionSynchronizationFactory transactionSynchronizationFactory;
    private volatile Callable<Message<?>> pollingTask;
    private volatile Flux<Message<?>> pollingFlux;
    private volatile Subscription subscription;
    private volatile ScheduledFuture<?> runningTask;
    private volatile boolean initialized;
    private final Collection<Advice> appliedAdvices = new HashSet();
    private final Object initializationMonitor = new Object();
    private Executor taskExecutor = new SyncTaskExecutor();
    private boolean syncExecutor = true;
    private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader();
    private Trigger trigger = new PeriodicTrigger(10);
    private volatile long maxMessagesPerPoll = -1;

    public AbstractPollingEndpoint() {
        setPhase(1073741823);
    }

    public void setTaskExecutor(Executor executor) {
        this.taskExecutor = executor != null ? executor : new SyncTaskExecutor();
        this.syncExecutor = (this.taskExecutor instanceof SyncTaskExecutor) || ((this.taskExecutor instanceof ErrorHandlingTaskExecutor) && ((ErrorHandlingTaskExecutor) this.taskExecutor).isSyncExecutor());
    }

    protected Executor getTaskExecutor() {
        return this.taskExecutor;
    }

    protected boolean isSyncExecutor() {
        return this.syncExecutor;
    }

    public void setTrigger(Trigger trigger) {
        this.trigger = trigger != null ? trigger : new PeriodicTrigger(10L);
    }

    public void setAdviceChain(List<Advice> list) {
        this.adviceChain = list;
    }

    @ManagedAttribute
    public void setMaxMessagesPerPoll(long j) {
        this.maxMessagesPerPoll = j;
    }

    public long getMaxMessagesPerPoll() {
        return this.maxMessagesPerPoll;
    }

    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

    @Override // org.springframework.beans.factory.BeanClassLoaderAware
    public void setBeanClassLoader(ClassLoader classLoader) {
        this.beanClassLoader = classLoader;
    }

    public void setTransactionSynchronizationFactory(TransactionSynchronizationFactory transactionSynchronizationFactory) {
        this.transactionSynchronizationFactory = transactionSynchronizationFactory;
    }

    public MessageChannel getDefaultErrorChannel() {
        if (this.errorHandlerIsDefault || !(this.errorHandler instanceof MessagePublishingErrorHandler)) {
            return null;
        }
        return ((MessagePublishingErrorHandler) this.errorHandler).getDefaultErrorChannel();
    }

    protected ClassLoader getBeanClassLoader() {
        return this.beanClassLoader;
    }

    protected boolean isReceiveOnlyAdvice(Advice advice) {
        return advice instanceof ReceiveMessageAdvice;
    }

    protected void applyReceiveOnlyAdviceChain(Collection<Advice> collection) {
        if (CollectionUtils.isEmpty(collection)) {
            return;
        }
        Object receiveMessageSource = getReceiveMessageSource();
        if (receiveMessageSource != null) {
            if (AopUtils.isAopProxy(receiveMessageSource)) {
                Advised advised = (Advised) receiveMessageSource;
                Collection<Advice> collection2 = this.appliedAdvices;
                Objects.requireNonNull(advised);
                collection2.forEach(advised::removeAdvice);
                collection.forEach(advice -> {
                    advised.addAdvisor(adviceToReceiveAdvisor(advice));
                });
            } else {
                ProxyFactory proxyFactory = new ProxyFactory(receiveMessageSource);
                collection.forEach(advice2 -> {
                    proxyFactory.addAdvisor(adviceToReceiveAdvisor(advice2));
                });
                receiveMessageSource = proxyFactory.getProxy(getBeanClassLoader());
            }
            this.appliedAdvices.clear();
            this.appliedAdvices.addAll(collection);
            if (!isSyncExecutor()) {
                this.logger.warn(() -> {
                    return getComponentName() + ": A task executor is supplied and " + collection.size() + "ReceiveMessageAdvice(s) is/are provided. If an advice mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high frequency pollers. Consider using a downstream ExecutorChannel instead of adding an executor to the poller";
                });
            }
            setReceiveMessageSource(receiveMessageSource);
        }
    }

    private NameMatchMethodPointcutAdvisor adviceToReceiveAdvisor(Advice advice) {
        NameMatchMethodPointcutAdvisor nameMatchMethodPointcutAdvisor = new NameMatchMethodPointcutAdvisor(advice);
        nameMatchMethodPointcutAdvisor.addMethodName(ConfigConstants.CONFIG_RECEIVE_SECTION);
        return nameMatchMethodPointcutAdvisor;
    }

    protected boolean isReactive() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Flux<Message<?>> getPollingFlux() {
        return this.pollingFlux;
    }

    protected Object getReceiveMessageSource() {
        return null;
    }

    protected void setReceiveMessageSource(Object obj) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        synchronized (this.initializationMonitor) {
            if (this.initialized) {
                return;
            }
            Assert.notNull(this.trigger, "Trigger is required");
            if (this.taskExecutor != null && !(this.taskExecutor instanceof ErrorHandlingTaskExecutor)) {
                if (this.errorHandler == null) {
                    this.errorHandler = ChannelUtils.getErrorHandler(getBeanFactory());
                    this.errorHandlerIsDefault = true;
                }
                this.taskExecutor = new ErrorHandlingTaskExecutor(this.taskExecutor, this.errorHandler);
            }
            if (this.transactionSynchronizationFactory == null && this.adviceChain != null) {
                Stream<Advice> stream = this.adviceChain.stream();
                Class<TransactionInterceptor> cls = TransactionInterceptor.class;
                Objects.requireNonNull(TransactionInterceptor.class);
                if (stream.anyMatch((v1) -> {
                    return r1.isInstance(v1);
                })) {
                    this.transactionSynchronizationFactory = new PassThroughTransactionSynchronizationFactory();
                }
            }
            this.initialized = true;
            try {
                super.onInit();
            } catch (Exception e) {
                throw new BeanInitializationException("Cannot initialize: " + this, e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    public void doStart() {
        if (!this.initialized) {
            onInit();
        }
        this.pollingTask = createPollingTask();
        if (isReactive()) {
            this.pollingFlux = createFluxGenerator();
            return;
        }
        TaskScheduler taskScheduler = getTaskScheduler();
        Assert.state(taskScheduler != null, "unable to start polling, no taskScheduler available");
        this.runningTask = taskScheduler.schedule(createPoller(), this.trigger);
    }

    private Callable<Message<?>> createPollingTask() {
        List list = null;
        if (!CollectionUtils.isEmpty(this.adviceChain)) {
            list = (List) this.adviceChain.stream().filter(this::isReceiveOnlyAdvice).collect(Collectors.toList());
        }
        Callable<Message<?>> callable = this::doPoll;
        List<Advice> list2 = this.adviceChain;
        if (!CollectionUtils.isEmpty(list2)) {
            ProxyFactory proxyFactory = new ProxyFactory(callable);
            if (!CollectionUtils.isEmpty(list2)) {
                Stream<Advice> filter = list2.stream().filter(advice -> {
                    return !isReceiveOnlyAdvice(advice);
                });
                Objects.requireNonNull(proxyFactory);
                filter.forEach(proxyFactory::addAdvice);
            }
            callable = (Callable) proxyFactory.getProxy(this.beanClassLoader);
        }
        if (!CollectionUtils.isEmpty(list)) {
            applyReceiveOnlyAdviceChain(list);
        }
        return callable;
    }

    private Runnable createPoller() {
        return () -> {
            this.taskExecutor.execute(() -> {
                int i = 0;
                while (this.initialized) {
                    if (this.maxMessagesPerPoll > 0 && i >= this.maxMessagesPerPoll) {
                        return;
                    }
                    if (this.maxMessagesPerPoll == 0) {
                        this.logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                        return;
                    } else if (pollForMessage() == null) {
                        return;
                    } else {
                        i++;
                    }
                }
            });
        };
    }

    private Flux<Message<?>> createFluxGenerator() {
        SimpleTriggerContext simpleTriggerContext = new SimpleTriggerContext();
        return Flux.generate(synchronousSink -> {
            Date nextExecutionTime = this.trigger.nextExecutionTime(simpleTriggerContext);
            if (nextExecutionTime == null) {
                synchronousSink.complete();
            } else {
                simpleTriggerContext.update(nextExecutionTime, null, null);
                synchronousSink.next(Duration.ofMillis(nextExecutionTime.getTime() - System.currentTimeMillis()));
            }
        }).concatMap(duration -> {
            return Mono.delay(duration).doOnNext(l -> {
                simpleTriggerContext.update(simpleTriggerContext.lastScheduledExecutionTime(), new Date(), null);
            }).flatMapMany(l2 -> {
                return Flux.defer(() -> {
                    if (this.maxMessagesPerPoll != 0) {
                        return Flux.generate(synchronousSink2 -> {
                            Message<?> pollForMessage = pollForMessage();
                            if (pollForMessage != null) {
                                synchronousSink2.next(pollForMessage);
                            } else {
                                synchronousSink2.complete();
                            }
                        }).take(this.maxMessagesPerPoll < 0 ? Long.MAX_VALUE : this.maxMessagesPerPoll, true);
                    }
                    this.logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                    return Mono.empty();
                }).subscribeOn(Schedulers.fromExecutor(this.taskExecutor)).doOnComplete(() -> {
                    simpleTriggerContext.update(simpleTriggerContext.lastScheduledExecutionTime(), simpleTriggerContext.lastActualExecutionTime(), new Date());
                });
            });
        }, 0).repeat(this::isActive).doOnSubscribe(subscription -> {
            this.subscription = subscription;
        });
    }

    private Message<?> pollForMessage() {
        try {
            try {
                Message<?> call = this.pollingTask.call();
                if (this.transactionSynchronizationFactory != null) {
                    Object resourceToBind = getResourceToBind();
                    if (TransactionSynchronizationManager.hasResource(resourceToBind)) {
                        TransactionSynchronizationManager.unbindResource(resourceToBind);
                    }
                }
                return call;
            } catch (Exception e) {
                if (e instanceof MessagingException) {
                    throw ((MessagingException) e);
                }
                Message<?> message = null;
                if (this.transactionSynchronizationFactory != null) {
                    Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
                    if (resource instanceof IntegrationResourceHolder) {
                        message = ((IntegrationResourceHolder) resource).getMessage();
                    }
                }
                throw new MessagingException(message, e);
            }
        } catch (Throwable th) {
            if (this.transactionSynchronizationFactory != null) {
                Object resourceToBind2 = getResourceToBind();
                if (TransactionSynchronizationManager.hasResource(resourceToBind2)) {
                    TransactionSynchronizationManager.unbindResource(resourceToBind2);
                }
            }
            throw th;
        }
    }

    private Message<?> doPoll() {
        IntegrationResourceHolder bindResourceHolderIfNecessary = bindResourceHolderIfNecessary(getResourceKey(), getResourceToBind());
        Message<?> message = null;
        try {
            message = receiveMessage();
        } catch (Exception e) {
            if (Thread.interrupted()) {
                this.logger.debug(() -> {
                    return "Poll interrupted - during stop()? : " + e.getMessage();
                });
                return null;
            }
            ReflectionUtils.rethrowRuntimeException(e);
        }
        if (message == null) {
            this.logger.debug("Received no Message during the poll, returning 'false'");
            return null;
        }
        messageReceived(bindResourceHolderIfNecessary, message);
        return message;
    }

    private void messageReceived(IntegrationResourceHolder integrationResourceHolder, Message<?> message) {
        this.logger.debug(() -> {
            return "Poll resulted in Message: " + message;
        });
        if (integrationResourceHolder != null) {
            integrationResourceHolder.setMessage(message);
        }
        if (isReactive()) {
            return;
        }
        try {
            handleMessage(message);
        } catch (MessagingException e) {
            throw new MessagingExceptionWrapper(message, e);
        } catch (Exception e2) {
            throw new MessagingException(message, e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        if (this.runningTask != null) {
            this.runningTask.cancel(true);
        }
        this.runningTask = null;
        if (this.subscription != null) {
            this.subscription.cancel();
        }
    }

    protected abstract Message<?> receiveMessage();

    protected abstract void handleMessage(Message<?> message);

    protected Object getResourceToBind() {
        return null;
    }

    protected String getResourceKey() {
        return null;
    }

    private IntegrationResourceHolder bindResourceHolderIfNecessary(String str, Object obj) {
        if (this.transactionSynchronizationFactory == null || obj == null || !TransactionSynchronizationManager.isActualTransactionActive()) {
            return null;
        }
        TransactionSynchronization create = this.transactionSynchronizationFactory.create(obj);
        if (create != null) {
            TransactionSynchronizationManager.registerSynchronization(create);
            if (create instanceof IntegrationResourceHolderSynchronization) {
                IntegrationResourceHolderSynchronization integrationResourceHolderSynchronization = (IntegrationResourceHolderSynchronization) create;
                integrationResourceHolderSynchronization.setShouldUnbindAtCompletion(false);
                if (!TransactionSynchronizationManager.hasResource(obj)) {
                    TransactionSynchronizationManager.bindResource(obj, integrationResourceHolderSynchronization.getResourceHolder());
                }
            }
        }
        Object resource = TransactionSynchronizationManager.getResource(obj);
        if (!(resource instanceof IntegrationResourceHolder)) {
            return null;
        }
        IntegrationResourceHolder integrationResourceHolder = (IntegrationResourceHolder) resource;
        if (str != null) {
            integrationResourceHolder.addAttribute(str, obj);
        }
        return integrationResourceHolder;
    }
}
