package rx.quasar;

import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.channels.ReceivePort;
import rx.Observable;
import rx.Subscriber;

/* loaded from: input_file:rx/quasar/OnSubscribeFromChannel.class */
public final class OnSubscribeFromChannel<T> implements Observable.OnSubscribe<T> {
    final ReceivePort<? extends T> channel;

    public OnSubscribeFromChannel(ReceivePort<? extends T> receivePort) {
        this.channel = receivePort;
    }

    @Suspendable
    public void call(Subscriber<? super T> subscriber) {
        while (true) {
            try {
                Object receive = this.channel.receive();
                if (receive == null) {
                    break;
                } else if (subscriber.isUnsubscribed()) {
                    return;
                } else {
                    subscriber.onNext(receive);
                }
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                subscriber.onError(e2);
                return;
            }
        }
        subscriber.onCompleted();
    }
}
