package reactor.groovy.config;

import java.util.Iterator;
import java.util.List;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.bus.Event;
import reactor.bus.filter.Filter;
import reactor.bus.registry.Registration;
import reactor.bus.registry.Registry;
import reactor.bus.routing.ConsumerFilteringRouter;
import reactor.fn.Consumer;

/* loaded from: input_file:reactor/groovy/config/StreamRouter.class */
public class StreamRouter extends ConsumerFilteringRouter {
    private final Registry<Object, Processor<Event<?>, Event<?>>> processorRegistry;

    public StreamRouter(Filter filter, Registry<Object, Processor<Event<?>, Event<?>>> registry) {
        super(filter);
        this.processorRegistry = registry;
    }

    public <E extends Event<?>> void route(Object obj, final E e, final List<Registration<Object, ? extends Consumer<? extends Event<?>>>> list, final Consumer<E> consumer, final Consumer<Throwable> consumer2) {
        Iterator it = this.processorRegistry.select(obj).iterator();
        while (it.hasNext()) {
            Processor processor = (Processor) ((Registration) it.next()).getObject();
            processor.onNext(e);
            processor.subscribe(new Subscriber<Event<?>>() { // from class: reactor.groovy.config.StreamRouter.1
                public void onSubscribe(Subscription subscription) {
                    subscription.request(2147483647L);
                }

                public void onNext(Event<?> event) {
                    StreamRouter.super.route(event.getKey(), event, list, consumer, consumer2);
                }

                public void onComplete() {
                }

                public void onError(Throwable th) {
                    e.consumeError(th);
                }
            });
        }
    }
}
