package org.axonframework.queryhandling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import javax.annotation.Nonnull;
import org.axonframework.common.AxonConfigurationException;
import org.axonframework.common.BuilderUtils;
import org.axonframework.common.Registration;
import org.axonframework.messaging.Message;
import org.axonframework.messaging.MessageDispatchInterceptor;
import org.axonframework.messaging.responsetypes.MultipleInstancesResponseType;
import org.axonframework.messaging.responsetypes.OptionalResponseType;
import org.axonframework.messaging.responsetypes.PublisherResponseType;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.axonframework.monitoring.NoOpMessageMonitor;
import org.axonframework.tracing.NoOpSpanFactory;
import org.axonframework.tracing.SpanFactory;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:BOOT-INF/lib/axon-messaging-4.6.7.jar:org/axonframework/queryhandling/SimpleQueryUpdateEmitter.class */
public class SimpleQueryUpdateEmitter implements QueryUpdateEmitter {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) SimpleQueryUpdateEmitter.class);
    private static final String QUERY_UPDATE_TASKS_RESOURCE_KEY = "/update-tasks";
    private final MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor;
    private final SpanFactory spanFactory;
    private final ConcurrentMap<SubscriptionQueryMessage<?, ?, ?>, SinkWrapper<?>> updateHandlers = new ConcurrentHashMap();
    private final List<MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage<?>>> dispatchInterceptors = new CopyOnWriteArrayList();

    /* loaded from: input_file:BOOT-INF/lib/axon-messaging-4.6.7.jar:org/axonframework/queryhandling/SimpleQueryUpdateEmitter$Builder.class */
    public static class Builder {
        private MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> updateMessageMonitor = NoOpMessageMonitor.INSTANCE;
        private SpanFactory spanFactory = NoOpSpanFactory.INSTANCE;

        public Builder updateMessageMonitor(@Nonnull MessageMonitor<? super SubscriptionQueryUpdateMessage<?>> messageMonitor) {
            BuilderUtils.assertNonNull(messageMonitor, "MessageMonitor may not be null");
            this.updateMessageMonitor = messageMonitor;
            return this;
        }

        public Builder spanFactory(@Nonnull SpanFactory spanFactory) {
            BuilderUtils.assertNonNull(spanFactory, "SpanFactory may not be null");
            this.spanFactory = spanFactory;
            return this;
        }

        public SimpleQueryUpdateEmitter build() {
            return new SimpleQueryUpdateEmitter(this);
        }

        protected void validate() throws AxonConfigurationException {
        }
    }

    protected SimpleQueryUpdateEmitter(Builder builder) {
        builder.validate();
        this.updateMessageMonitor = builder.updateMessageMonitor;
        this.spanFactory = builder.spanFactory;
    }

    public static Builder builder() {
        return new Builder();
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public boolean queryUpdateHandlerRegistered(@Nonnull SubscriptionQueryMessage<?, ?, ?> subscriptionQueryMessage) {
        return this.updateHandlers.keySet().stream().anyMatch(subscriptionQueryMessage2 -> {
            return subscriptionQueryMessage2.getIdentifier().equals(subscriptionQueryMessage.getIdentifier());
        });
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    @Deprecated
    public <U> UpdateHandlerRegistration<U> registerUpdateHandler(SubscriptionQueryMessage<?, ?, ?> subscriptionQueryMessage, SubscriptionQueryBackpressure subscriptionQueryBackpressure, int i) {
        EmitterProcessor create = EmitterProcessor.create(i);
        FluxSink<T> sink = create.sink(subscriptionQueryBackpressure.getOverflowStrategy());
        sink.onDispose(() -> {
            this.updateHandlers.remove(subscriptionQueryMessage);
        });
        FluxSinkWrapper fluxSinkWrapper = new FluxSinkWrapper(sink);
        this.updateHandlers.put(subscriptionQueryMessage, fluxSinkWrapper);
        Registration registration = () -> {
            this.updateHandlers.remove(subscriptionQueryMessage);
            return true;
        };
        Flux autoConnect = create.replay(i).autoConnect();
        fluxSinkWrapper.getClass();
        return new UpdateHandlerRegistration<>(registration, autoConnect, fluxSinkWrapper::complete);
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public <U> UpdateHandlerRegistration<U> registerUpdateHandler(@Nonnull SubscriptionQueryMessage<?, ?, ?> subscriptionQueryMessage, int i) {
        Sinks.Many limit = Sinks.many().replay().limit(i);
        SinksManyWrapper sinksManyWrapper = new SinksManyWrapper(limit);
        Runnable runnable = () -> {
            this.updateHandlers.remove(subscriptionQueryMessage);
        };
        Registration registration = () -> {
            runnable.run();
            return true;
        };
        this.updateHandlers.put(subscriptionQueryMessage, sinksManyWrapper);
        Flux doOnTerminate = limit.asFlux().doOnCancel(runnable).doOnTerminate(runnable);
        sinksManyWrapper.getClass();
        return new UpdateHandlerRegistration<>(registration, doOnTerminate, sinksManyWrapper::complete);
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public <U> void emit(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, U>> predicate, @Nonnull SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage) {
        SubscriptionQueryUpdateMessage subscriptionQueryUpdateMessage2 = (SubscriptionQueryUpdateMessage) this.spanFactory.propagateContext(subscriptionQueryUpdateMessage);
        this.spanFactory.createInternalSpan(() -> {
            return "SimpleQueryUpdateEmitter.emit";
        }, subscriptionQueryUpdateMessage2).run(() -> {
            runOnAfterCommitOrNow(this.spanFactory.createDispatchSpan(() -> {
                return "SimpleQueryUpdateEmitter.doEmit";
            }, subscriptionQueryUpdateMessage2, new Message[0]).wrapRunnable(() -> {
                doEmit(predicate, intercept((SubscriptionQueryUpdateMessage) this.spanFactory.propagateContext(subscriptionQueryUpdateMessage2)));
            }));
        });
    }

    private <U> SubscriptionQueryUpdateMessage<U> intercept(SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage) {
        SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage2 = subscriptionQueryUpdateMessage;
        Iterator<MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage<?>>> it = this.dispatchInterceptors.iterator();
        while (it.hasNext()) {
            subscriptionQueryUpdateMessage2 = (SubscriptionQueryUpdateMessage) it.next().handle((MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage<?>>) subscriptionQueryUpdateMessage2);
        }
        return subscriptionQueryUpdateMessage2;
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public void complete(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate) {
        runOnAfterCommitOrNow(() -> {
            doComplete(predicate);
        });
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public void completeExceptionally(@Nonnull Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate, @Nonnull Throwable th) {
        runOnAfterCommitOrNow(() -> {
            doCompleteExceptionally(predicate, th);
        });
    }

    @Override // org.axonframework.messaging.MessageDispatchInterceptorSupport
    @Nonnull
    public Registration registerDispatchInterceptor(@Nonnull MessageDispatchInterceptor<? super SubscriptionQueryUpdateMessage<?>> messageDispatchInterceptor) {
        this.dispatchInterceptors.add(messageDispatchInterceptor);
        return () -> {
            return this.dispatchInterceptors.remove(messageDispatchInterceptor);
        };
    }

    private <U> void doEmit(Predicate<SubscriptionQueryMessage<?, ?, U>> predicate, SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage) {
        this.updateHandlers.keySet().stream().filter(payloadMatchesQueryResponseType(subscriptionQueryUpdateMessage.getPayloadType())).filter(subscriptionQueryMessage -> {
            return predicate.test(subscriptionQueryMessage);
        }).forEach(subscriptionQueryMessage2 -> {
            Optional.ofNullable(this.updateHandlers.get(subscriptionQueryMessage2)).ifPresent(sinkWrapper -> {
                doEmit(subscriptionQueryMessage2, sinkWrapper, subscriptionQueryUpdateMessage);
            });
        });
    }

    private Predicate<SubscriptionQueryMessage<?, ?, ?>> payloadMatchesQueryResponseType(Class<?> cls) {
        return subscriptionQueryMessage -> {
            return subscriptionQueryMessage.getUpdateResponseType() instanceof MultipleInstancesResponseType ? cls.isArray() || Iterable.class.isAssignableFrom(cls) : subscriptionQueryMessage.getUpdateResponseType() instanceof OptionalResponseType ? Optional.class.isAssignableFrom(cls) : subscriptionQueryMessage.getUpdateResponseType() instanceof PublisherResponseType ? Publisher.class.isAssignableFrom(cls) : subscriptionQueryMessage.getUpdateResponseType().getExpectedResponseType().isAssignableFrom(cls);
        };
    }

    private <U> void doEmit(SubscriptionQueryMessage<?, ?, ?> subscriptionQueryMessage, SinkWrapper<?> sinkWrapper, SubscriptionQueryUpdateMessage<U> subscriptionQueryUpdateMessage) {
        MessageMonitor.MonitorCallback onMessageIngested = this.updateMessageMonitor.onMessageIngested(subscriptionQueryUpdateMessage);
        try {
            sinkWrapper.next(subscriptionQueryUpdateMessage);
            onMessageIngested.reportSuccess();
        } catch (Exception e) {
            logger.info("An error occurred while trying to emit an update to a query '{}'. The subscription will be cancelled. Exception summary: {}", subscriptionQueryMessage.getQueryName(), e.toString());
            onMessageIngested.reportFailure(e);
            this.updateHandlers.remove(subscriptionQueryMessage);
            emitError(subscriptionQueryMessage, e, sinkWrapper);
        }
    }

    private void doComplete(Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate) {
        this.updateHandlers.keySet().stream().filter(predicate).forEach(subscriptionQueryMessage -> {
            Optional.ofNullable(this.updateHandlers.get(subscriptionQueryMessage)).ifPresent(sinkWrapper -> {
                try {
                    sinkWrapper.complete();
                } catch (Exception e) {
                    emitError(subscriptionQueryMessage, e, sinkWrapper);
                }
            });
        });
    }

    private void emitError(SubscriptionQueryMessage<?, ?, ?> subscriptionQueryMessage, Throwable th, SinkWrapper<?> sinkWrapper) {
        try {
            sinkWrapper.error(th);
        } catch (Exception e) {
            logger.error(String.format("An error happened while trying to inform update handler about the error. Query: %s", subscriptionQueryMessage));
        }
    }

    private void doCompleteExceptionally(Predicate<SubscriptionQueryMessage<?, ?, ?>> predicate, Throwable th) {
        this.updateHandlers.keySet().stream().filter(predicate).forEach(subscriptionQueryMessage -> {
            Optional.ofNullable(this.updateHandlers.get(subscriptionQueryMessage)).ifPresent(sinkWrapper -> {
                emitError(subscriptionQueryMessage, th, sinkWrapper);
            });
        });
    }

    private void runOnAfterCommitOrNow(Runnable runnable) {
        if (!inStartedPhaseOfUnitOfWork()) {
            runnable.run();
        } else {
            UnitOfWork<?> unitOfWork = CurrentUnitOfWork.get();
            ((List) unitOfWork.getOrComputeResource(toString() + QUERY_UPDATE_TASKS_RESOURCE_KEY, str -> {
                ArrayList arrayList = new ArrayList();
                unitOfWork.afterCommit(unitOfWork2 -> {
                    arrayList.forEach((v0) -> {
                        v0.run();
                    });
                });
                return arrayList;
            })).add(runnable);
        }
    }

    private boolean inStartedPhaseOfUnitOfWork() {
        return CurrentUnitOfWork.isStarted() && UnitOfWork.Phase.STARTED.equals(CurrentUnitOfWork.get().phase());
    }

    @Override // org.axonframework.queryhandling.QueryUpdateEmitter
    public Set<SubscriptionQueryMessage<?, ?, ?>> activeSubscriptions() {
        return Collections.unmodifiableSet(this.updateHandlers.keySet());
    }
}
