/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.runtime.core.sources;

import io.mantisrx.runtime.Context;
import io.mantisrx.runtime.core.sources.SourceFunction;
import io.mantisrx.runtime.source.Index;
import io.mantisrx.runtime.source.Source;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.Subscription;

public class ObservableSourceImpl<R>
implements SourceFunction<R> {
    private final Source<R> source;
    private final AtomicReference<R> elemContainer = new AtomicReference();
    private Subscription subscription;

    public ObservableSourceImpl(Source<R> source) {
        this.source = source;
    }

    @Override
    public void init() {
        Context context = new Context();
        Index index = new Index(0, 0);
        this.source.init(context, index);
        this.subscription = ((Observable)this.source.call(context, index)).flatMap(x -> x).subscribe(this.elemContainer::set);
    }

    @Override
    public void close() throws IOException {
        if (this.subscription != null) {
            this.subscription.unsubscribe();
        }
    }

    @Override
    public R next() {
        return this.elemContainer.get();
    }

    public Source<R> getSource() {
        return this.source;
    }
}

