package io.vertx.rxcore.java.eventbus;

import io.vertx.rxcore.java.impl.HandlerSubscription;
import io.vertx.rxcore.java.impl.SingleSubscriptionHandler;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:io/vertx/rxcore/java/eventbus/RxEventBus.class */
public class RxEventBus {
    public static final int DEFAULT_TIMEOUT = 60000;
    private final EventBus eventBus;
    private final int defaultTimeout;

    /* loaded from: input_file:io/vertx/rxcore/java/eventbus/RxEventBus$AsyncSendHandler.class */
    protected static class AsyncSendHandler<R> extends SingleSubscriptionHandler<RxMessage<R>, AsyncResult<Message<R>>> {
        @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
        public void handle(AsyncResult<Message<R>> asyncResult) {
            if (asyncResult.succeeded()) {
                fireResult(new RxMessage((Message) asyncResult.result()));
            } else {
                fireError(asyncResult.cause());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/vertx/rxcore/java/eventbus/RxEventBus$AsyncSendSubscription.class */
    public static class AsyncSendSubscription<R> extends HandlerSubscription<AsyncResult<Message<R>>, RxMessage<R>> {
        public AsyncSendSubscription(Subscriber<RxMessage<R>> subscriber) {
            super(subscriber);
        }

        @Override // io.vertx.rxcore.java.impl.HandlerSubscription
        public void handle(AsyncResult<Message<R>> asyncResult) {
            if (asyncResult.succeeded()) {
                fireComplete(new RxMessage((Message) asyncResult.result()));
            } else {
                fireError(asyncResult.cause());
            }
        }
    }

    /* loaded from: input_file:io/vertx/rxcore/java/eventbus/RxEventBus$ReceiveHandler.class */
    protected static class ReceiveHandler<R> extends SingleSubscriptionHandler<RxMessage<R>, Message> {
        protected ReceiveHandler() {
        }

        @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
        public void handle(Message message) {
            fireNext(new RxMessage(message));
        }
    }

    /* loaded from: input_file:io/vertx/rxcore/java/eventbus/RxEventBus$SendHandler.class */
    protected static class SendHandler<R> extends SingleSubscriptionHandler<RxMessage<R>, Message<R>> {
        @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
        public void handle(Message message) {
            fireResult(new RxMessage(message));
        }
    }

    public RxEventBus(EventBus eventBus) {
        this(eventBus, DEFAULT_TIMEOUT);
    }

    public RxEventBus(EventBus eventBus, int i) {
        this.eventBus = eventBus;
        this.defaultTimeout = i;
    }

    public <S, R> Observable<RxMessage<R>> send(String str, S s) {
        SendHandler sendHandler = new SendHandler();
        this.eventBus.send(str, s, sendHandler);
        return Observable.create(sendHandler);
    }

    public <S, R> Observable<RxMessage<R>> sendWithTimeout(String str, S s, long j) {
        AsyncSendHandler asyncSendHandler = new AsyncSendHandler();
        this.eventBus.sendWithTimeout(str, s, j, asyncSendHandler);
        return Observable.create(asyncSendHandler);
    }

    public <S, R> Observable<RxMessage<R>> observeSend(final String str, final S s) {
        return Observable.create(new Observable.OnSubscribe<RxMessage<R>>() { // from class: io.vertx.rxcore.java.eventbus.RxEventBus.1
            public void call(Subscriber<? super RxMessage<R>> subscriber) {
                AsyncSendSubscription asyncSendSubscription = new AsyncSendSubscription(subscriber);
                RxEventBus.this.eventBus.sendWithTimeout(str, s, RxEventBus.this.defaultTimeout, asyncSendSubscription);
                subscriber.add(asyncSendSubscription);
            }
        });
    }

    public <S, R> Observable<RxMessage<R>> observeSendWithTimeout(final String str, final S s, final long j) {
        return Observable.create(new Observable.OnSubscribe<RxMessage<R>>() { // from class: io.vertx.rxcore.java.eventbus.RxEventBus.2
            public void call(Subscriber<? super RxMessage<R>> subscriber) {
                AsyncSendSubscription asyncSendSubscription = new AsyncSendSubscription(subscriber);
                RxEventBus.this.eventBus.sendWithTimeout(str, s, j, asyncSendSubscription);
                subscriber.add(asyncSendSubscription);
            }
        });
    }

    public <T> Observable<RxMessage<T>> registerLocalHandler(final String str) {
        return Observable.create(new ReceiveHandler<T>() { // from class: io.vertx.rxcore.java.eventbus.RxEventBus.3
            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void execute() {
                RxEventBus.this.eventBus.registerLocalHandler(str, this);
            }
        });
    }

    public <T> Observable<RxMessage<T>> registerHandler(final String str) {
        return Observable.create(new ReceiveHandler<T>() { // from class: io.vertx.rxcore.java.eventbus.RxEventBus.4
            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void execute() {
                RxEventBus.this.eventBus.registerHandler(str, this);
            }
        });
    }

    public <S, R> Observable<RxStream<S, R>> observeStream(final String str, final S s) {
        final RxStream rxStream = new RxStream();
        return Observable.create(new SingleSubscriptionHandler<RxStream<S, R>, Message<R>>() { // from class: io.vertx.rxcore.java.eventbus.RxEventBus.5
            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void execute() {
                rxStream.callback = this;
                RxEventBus.this.eventBus.send(str, s, this);
            }

            @Override // io.vertx.rxcore.java.impl.SingleSubscriptionHandler
            public void handle(Message<R> message) {
                rxStream.handle((Message) message);
                fireNext(rxStream);
                if (message.replyAddress() == null) {
                    fireComplete();
                }
            }
        });
    }
}
