/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.operator.impl;

import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.impl.SourceFunction;
import io.ray.streaming.message.Record;
import io.ray.streaming.operator.ChainStrategy;
import io.ray.streaming.operator.OperatorType;
import io.ray.streaming.operator.SourceOperator;
import io.ray.streaming.operator.StreamOperator;
import java.util.List;

public class SourceOperatorImpl<T>
extends StreamOperator<SourceFunction<T>>
implements SourceOperator {
    private SourceContextImpl sourceContext;

    public SourceOperatorImpl(SourceFunction<T> function) {
        super(function);
        this.setChainStrategy(ChainStrategy.HEAD);
    }

    @Override
    public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
        super.open(collectorList, runtimeContext);
        this.sourceContext = new SourceContextImpl(collectorList);
        ((SourceFunction)this.function).init(runtimeContext.getParallelism(), runtimeContext.getTaskIndex());
    }

    @Override
    public void fetch() {
        try {
            ((SourceFunction)this.function).fetch(this.sourceContext);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public SourceFunction.SourceContext getSourceContext() {
        return this.sourceContext;
    }

    @Override
    public OperatorType getOpType() {
        return OperatorType.SOURCE;
    }

    class SourceContextImpl
    implements SourceFunction.SourceContext<T> {
        private List<Collector> collectors;

        public SourceContextImpl(List<Collector> collectors) {
            this.collectors = collectors;
        }

        @Override
        public void collect(T t) throws Exception {
            for (Collector collector : this.collectors) {
                collector.collect(new Record(t));
            }
        }
    }
}

