package rx.quasar;

import co.paralleluniverse.fibers.FiberAsync;
import co.paralleluniverse.fibers.SuspendExecution;
import co.paralleluniverse.fibers.Suspendable;
import co.paralleluniverse.strands.Strand;
import co.paralleluniverse.strands.channels.Channel;
import co.paralleluniverse.strands.channels.Channels;
import co.paralleluniverse.strands.channels.ReceivePort;
import co.paralleluniverse.strands.channels.SendPort;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;

/* loaded from: input_file:rx/quasar/ChannelObservable.class */
public final class ChannelObservable {

    /* loaded from: input_file:rx/quasar/ChannelObservable$AsyncObservable.class */
    private static class AsyncObservable<T> extends FiberAsync<T, ExecutionException> implements Observer<T> {
        private final Observable<T> o;

        public AsyncObservable(Observable<T> observable) {
            this.o = observable;
        }

        protected void requestAsync() {
            this.o.subscribe(this);
        }

        public void onNext(T t) {
            if (isCompleted()) {
                throw new IllegalStateException("Operation already completed");
            }
            asyncCompleted(t);
        }

        public void onError(Throwable th) {
            if (isCompleted()) {
                throw new IllegalStateException("Operation already completed");
            }
            asyncFailed(th);
        }

        public void onCompleted() {
            if (isCompleted()) {
                return;
            }
            asyncCompleted(null);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: wrapException, reason: merged with bridge method [inline-methods] */
        public ExecutionException m0wrapException(Throwable th) {
            return new ExecutionException(th);
        }
    }

    private ChannelObservable() {
    }

    public static <T> Observable<T> from(ReceivePort<T> receivePort) {
        return Observable.create(new OnSubscribeFromChannel(receivePort));
    }

    public static <T> Observable<T> from(ReceivePort<T> receivePort, Scheduler scheduler) {
        return Observable.create(new OnSubscribeFromChannel(receivePort)).subscribeOn(scheduler);
    }

    public static <T> Observer<T> to(final SendPort<T> sendPort) {
        return new Observer<T>() { // from class: rx.quasar.ChannelObservable.1
            @Suspendable
            public void onNext(T t) {
                try {
                    sendPort.send(t);
                } catch (InterruptedException e) {
                    Strand.interrupted();
                } catch (SuspendExecution e2) {
                    throw new AssertionError(e2);
                }
            }

            public void onCompleted() {
                sendPort.close();
            }

            public void onError(Throwable th) {
                sendPort.close(th);
            }
        };
    }

    public static <T> ReceivePort<T> subscribe(int i, Channels.OverflowPolicy overflowPolicy, Observable<? extends T> observable) {
        final Channel newChannel = Channels.newChannel(i, overflowPolicy);
        observable.subscribe(new Observer<T>() { // from class: rx.quasar.ChannelObservable.2
            @Suspendable
            public void onNext(T t) {
                try {
                    newChannel.send(t);
                } catch (InterruptedException e) {
                    Strand.interrupted();
                } catch (SuspendExecution e2) {
                    throw new AssertionError(e2);
                }
            }

            public void onCompleted() {
                newChannel.close();
            }

            public void onError(Throwable th) {
                newChannel.close(th);
            }
        });
        return newChannel;
    }

    public static <T> T get(Observable<T> observable) throws ExecutionException, SuspendExecution, InterruptedException {
        return (T) new AsyncObservable(observable).run();
    }

    public static <T> T get(Observable<T> observable, long j, TimeUnit timeUnit) throws ExecutionException, SuspendExecution, InterruptedException, TimeoutException {
        return (T) new AsyncObservable(observable).run(j, timeUnit);
    }
}
