package org.apache.camel.component.reactor.engine;

import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.ReactiveStreamsCamelSubscriber;
import org.apache.camel.component.reactive.streams.ReactiveStreamsConsumer;
import org.apache.camel.component.reactive.streams.ReactiveStreamsHelper;
import org.apache.camel.component.reactive.streams.ReactiveStreamsProducer;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.util.BodyConverter;
import org.apache.camel.component.reactive.streams.util.ConvertingPublisher;
import org.apache.camel.component.reactive.streams.util.ConvertingSubscriber;
import org.apache.camel.component.reactive.streams.util.UnwrapStreamProcessor;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.function.Suppliers;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/camel/component/reactor/engine/ReactorStreamsService.class */
public final class ReactorStreamsService extends ServiceSupport implements CamelReactiveStreamsService {
    private final CamelContext context;
    private final ConcurrentMap<String, ReactorCamelProcessor> publishers = new ConcurrentHashMap();
    private final ConcurrentMap<String, ReactiveStreamsCamelSubscriber> subscribers = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> publishedUriToStream = new ConcurrentHashMap();
    private final ConcurrentMap<String, String> requestedUriToStream = new ConcurrentHashMap();
    private final Supplier<UnwrapStreamProcessor> unwrapStreamProcessorSupplier = Suppliers.memorize(UnwrapStreamProcessor::new);

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorStreamsService(CamelContext camelContext) {
        this.context = camelContext;
    }

    public String getId() {
        return ReactorStreamsConstants.SERVICE_NAME;
    }

    public void doStart() throws Exception {
    }

    public void doStop() throws Exception {
        Iterator<ReactorCamelProcessor> it = this.publishers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        Iterator<ReactiveStreamsCamelSubscriber> it2 = this.subscribers.values().iterator();
        while (it2.hasNext()) {
            it2.next().close();
        }
    }

    public Publisher<Exchange> fromStream(String str) {
        return getCamelProcessor(str).getPublisher();
    }

    public <T> Publisher<T> fromStream(String str, Class<T> cls) {
        Publisher<Exchange> fromStream = fromStream(str);
        return Exchange.class.isAssignableFrom(cls) ? (Publisher) Publisher.class.cast(fromStream) : Flux.from(fromStream).map(BodyConverter.forType(cls));
    }

    /* renamed from: streamSubscriber, reason: merged with bridge method [inline-methods] */
    public ReactiveStreamsCamelSubscriber m1streamSubscriber(String str) {
        return this.subscribers.computeIfAbsent(str, str2 -> {
            return new ReactiveStreamsCamelSubscriber(str);
        });
    }

    public <T> Subscriber<T> streamSubscriber(String str, Class<T> cls) {
        ReactiveStreamsCamelSubscriber m1streamSubscriber = m1streamSubscriber(str);
        return Exchange.class.equals(cls) ? (Subscriber) Subscriber.class.cast(m1streamSubscriber) : new ConvertingSubscriber(m1streamSubscriber, this.context);
    }

    public Publisher<Exchange> toStream(String str, Object obj) {
        return doRequest(str, ReactiveStreamsHelper.convertToExchange(this.context, obj));
    }

    public Function<?, ? extends Publisher<Exchange>> toStream(String str) {
        return obj -> {
            return toStream(str, obj);
        };
    }

    public <T> Publisher<T> toStream(String str, Object obj, Class<T> cls) {
        return new ConvertingPublisher(toStream(str, obj), cls);
    }

    public <T> Function<Object, Publisher<T>> toStream(String str, Class<T> cls) {
        return obj -> {
            return toStream(str, obj, cls);
        };
    }

    public Publisher<Exchange> from(String str) {
        return fromStream(this.publishedUriToStream.computeIfAbsent(str, str2 -> {
            try {
                String generateUuid = this.context.getUuidGenerator().generateUuid();
                RouteBuilder.addRoutes(this.context, routeBuilder -> {
                    routeBuilder.from(str2).to("reactive-streams:" + generateUuid);
                });
                return generateUuid;
            } catch (Exception e) {
                throw new IllegalStateException("Unable to create source reactive stream from direct URI: " + str, e);
            }
        }));
    }

    public <T> Publisher<T> from(String str, Class<T> cls) {
        Publisher<Exchange> from = from(str);
        return Exchange.class.isAssignableFrom(cls) ? (Publisher) Publisher.class.cast(from) : Flux.from(from).map(BodyConverter.forType(cls));
    }

    public Subscriber<Exchange> subscriber(final String str) {
        try {
            final String generateUuid = this.context.getUuidGenerator().generateUuid();
            this.context.addRoutes(new RouteBuilder() { // from class: org.apache.camel.component.reactor.engine.ReactorStreamsService.1
                public void configure() throws Exception {
                    from("reactive-streams:" + generateUuid).to(str);
                }
            });
            return m1streamSubscriber(generateUuid);
        } catch (Exception e) {
            throw new IllegalStateException("Unable to create source reactive stream towards direct URI: " + str, e);
        }
    }

    public <T> Subscriber<T> subscriber(String str, Class<T> cls) {
        return new ConvertingSubscriber(subscriber(str), this.context);
    }

    public Publisher<Exchange> to(String str, Object obj) {
        return toStream(this.requestedUriToStream.computeIfAbsent(str, str2 -> {
            try {
                final String generateUuid = this.context.getUuidGenerator().generateUuid();
                this.context.addRoutes(new RouteBuilder() { // from class: org.apache.camel.component.reactor.engine.ReactorStreamsService.2
                    public void configure() throws Exception {
                        from("reactive-streams:" + generateUuid).to(str2);
                    }
                });
                return generateUuid;
            } catch (Exception e) {
                throw new IllegalStateException("Unable to create requested reactive stream from direct URI: " + str, e);
            }
        }), obj);
    }

    public Function<Object, Publisher<Exchange>> to(String str) {
        return obj -> {
            return to(str, obj);
        };
    }

    public <T> Publisher<T> to(String str, Object obj, Class<T> cls) {
        return Flux.from(to(str, obj)).map(BodyConverter.forType(cls));
    }

    public <T> Function<Object, Publisher<T>> to(String str, Class<T> cls) {
        return obj -> {
            return to(str, obj, cls);
        };
    }

    public void process(final String str, final Function<? super Publisher<Exchange>, ?> function) {
        try {
            this.context.addRoutes(new RouteBuilder() { // from class: org.apache.camel.component.reactor.engine.ReactorStreamsService.3
                public void configure() throws Exception {
                    RouteDefinition from = from(str);
                    Function function2 = function;
                    from.process(exchange -> {
                        exchange.getIn().setBody(function2.apply(Mono.just(exchange.copy())));
                    }).process((Processor) ReactorStreamsService.this.unwrapStreamProcessorSupplier.get());
                }
            });
        } catch (Exception e) {
            throw new IllegalStateException("Unable to add reactive stream processor to the direct URI: " + str, e);
        }
    }

    public <T> void process(String str, Class<T> cls, Function<? super Publisher<T>, ?> function) {
        process(str, publisher -> {
            return function.apply(Flux.from(publisher).map(BodyConverter.forType(cls)));
        });
    }

    public void attachCamelProducer(String str, ReactiveStreamsProducer reactiveStreamsProducer) {
        getCamelProcessor(str).attach(reactiveStreamsProducer);
    }

    public void detachCamelProducer(String str) {
        getCamelProcessor(str).detach();
    }

    public void sendCamelExchange(String str, Exchange exchange) {
        getCamelProcessor(str).send(exchange);
    }

    private ReactorCamelProcessor getCamelProcessor(String str) {
        return this.publishers.computeIfAbsent(str, str2 -> {
            return new ReactorCamelProcessor(this, str2);
        });
    }

    public ReactiveStreamsCamelSubscriber attachCamelConsumer(String str, ReactiveStreamsConsumer reactiveStreamsConsumer) {
        ReactiveStreamsCamelSubscriber m1streamSubscriber = m1streamSubscriber(str);
        m1streamSubscriber.attachConsumer(reactiveStreamsConsumer);
        return m1streamSubscriber;
    }

    public void detachCamelConsumer(String str) {
        m1streamSubscriber(str).detachConsumer();
    }

    protected Publisher<Exchange> doRequest(String str, Exchange exchange) {
        ReactiveStreamsConsumer consumer = m1streamSubscriber(str).getConsumer();
        if (consumer == null) {
            throw new IllegalStateException("No consumers attached to the stream " + str);
        }
        return Mono.create(monoSink -> {
            exchange.addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.reactor.engine.ReactorStreamsService.4
                public void onComplete(Exchange exchange2) {
                    monoSink.success(exchange2);
                }

                public void onFailure(Exchange exchange2) {
                    Exception exception = exchange2.getException();
                    if (exception == null) {
                        exception = new IllegalStateException("Unknown Exception");
                    }
                    monoSink.error(exception);
                }
            });
        }).doOnSubscribe(subscription -> {
            consumer.process(exchange, ReactorStreamsConstants.EMPTY_ASYNC_CALLBACK);
        });
    }
}
