/*
 * Decompiled with CFR 0.152.
 */
package io.helidon.messaging;

import io.helidon.common.reactive.Multi;
import io.helidon.config.Config;
import io.helidon.messaging.Channel;
import io.helidon.messaging.Emitter;
import io.helidon.messaging.MessagingImpl;
import java.util.concurrent.Flow;
import java.util.function.Consumer;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.reactivestreams.FlowAdapters;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public interface Messaging {
    public Messaging start();

    public void stop();

    public static Builder builder() {
        return new Builder();
    }

    public static final class Builder
    implements io.helidon.common.Builder<Messaging> {
        private final MessagingImpl messaging = new MessagingImpl();

        private Builder() {
        }

        public Builder config(Config config) {
            this.messaging.setConfig(config);
            return this;
        }

        public Builder connector(ConnectorFactory connector) {
            if (connector instanceof IncomingConnectorFactory) {
                this.messaging.addIncomingConnector((IncomingConnectorFactory)connector);
            }
            if (connector instanceof OutgoingConnectorFactory) {
                this.messaging.addOutgoingConnector((OutgoingConnectorFactory)connector);
            }
            return this;
        }

        public <PAYLOAD> Builder emitter(Emitter<PAYLOAD> emitter) {
            this.messaging.addEmitter(emitter);
            for (Channel<PAYLOAD> ch : emitter.channels()) {
                this.messaging.registerChannel(ch);
                ch.setPublisher(emitter);
            }
            return this;
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, PublisherBuilder<? extends Message<? extends PAYLOAD>> publisherBuilder) {
            return this.publisher(channel, publisherBuilder.buildRs());
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Publisher<? extends PAYLOAD> publisher, Function<? super PAYLOAD, ? extends Message<? extends PAYLOAD>> wrapper) {
            if (wrapper == null) {
                wrapper = Message::of;
            }
            return this.publisher(channel, ReactiveStreams.fromPublisher(publisher).map(wrapper).buildRs());
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Flow.Publisher<? extends PAYLOAD> publisher, Function<? super PAYLOAD, ? extends Message<? extends PAYLOAD>> wrapper) {
            if (wrapper == null) {
                wrapper = Message::of;
            }
            return this.publisher(channel, FlowAdapters.toPublisher(publisher), wrapper);
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Flow.Publisher<? extends Message<? extends PAYLOAD>> publisher) {
            return this.publisher(channel, FlowAdapters.toPublisher(publisher));
        }

        public <PAYLOAD> Builder publisher(Channel<PAYLOAD> channel, Publisher<? extends Message<? extends PAYLOAD>> publisher) {
            this.messaging.registerChannel(channel);
            channel.setPublisher(publisher);
            return this;
        }

        public <PAYLOAD> Builder listener(Channel<PAYLOAD> channel, Consumer<? super PAYLOAD> consumer) {
            this.messaging.registerChannel(channel);
            channel.setSubscriber((Subscriber<Message<?>>)Builder.unwrapProcessorBuilder().forEach(consumer).build());
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Flow.Subscriber<? extends Message<? extends PAYLOAD>> subscriber) {
            this.subscriber(channel, FlowAdapters.toSubscriber(subscriber));
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Consumer<Multi<? extends Message<? extends PAYLOAD>>> consumer) {
            Processor processor = ReactiveStreams.builder().buildRs();
            consumer.accept(Multi.create((Flow.Publisher)FlowAdapters.toFlowPublisher((Publisher)processor)));
            this.subscriber(channel, (Subscriber<? extends Message<? extends PAYLOAD>>)processor);
            return this;
        }

        public <PAYLOAD, RESULT> Builder subscriber(Channel<PAYLOAD> channel, SubscriberBuilder<? extends Message<? extends PAYLOAD>, RESULT> subscriberBuilder) {
            this.subscriber(channel, (Subscriber<? extends Message<? extends PAYLOAD>>)subscriberBuilder.build());
            return this;
        }

        public <PAYLOAD> Builder subscriber(Channel<PAYLOAD> channel, Subscriber<? extends Message<? extends PAYLOAD>> subscriber) {
            this.messaging.registerChannel(channel);
            channel.setSubscriber(subscriber);
            return this;
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> in, Channel<RESULT> out, Processor<? extends Message<? extends PAYLOAD>, ? extends Message<? extends RESULT>> processor) {
            this.messaging.registerChannel(in);
            this.messaging.registerChannel(out);
            in.setSubscriber((Subscriber<Message<?>>)processor);
            out.setPublisher((Publisher<Message<?>>)processor);
            return this;
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> in, Channel<RESULT> out, ProcessorBuilder<? extends Message<? extends PAYLOAD>, ? extends Message<? extends RESULT>> processorBuilder) {
            return this.processor(in, out, processorBuilder.buildRs());
        }

        public <PAYLOAD, RESULT> Builder processor(Channel<PAYLOAD> in, Channel<RESULT> out, Function<? super PAYLOAD, ? extends RESULT> messageFunction) {
            Processor processor = Builder.unwrapProcessorBuilder().map(messageFunction).via(Builder.wrapProcessorBuilder()).buildRs();
            return this.processor(in, out, processor);
        }

        public Messaging build() {
            if (this.messaging.getConfig() == null) {
                this.messaging.setConfig(Config.create());
            }
            return this.messaging;
        }

        private static <PAYLOAD> ProcessorBuilder<? super PAYLOAD, Message<? extends PAYLOAD>> wrapProcessorBuilder() {
            return ReactiveStreams.builder().map(Message::of);
        }

        private static <PAYLOAD> ProcessorBuilder<? extends Message<? extends PAYLOAD>, ? extends PAYLOAD> unwrapProcessorBuilder() {
            return ReactiveStreams.builder().peek(Message::ack).map(Message::getPayload);
        }
    }
}

