/*
 * Decompiled with CFR 0.152.
 */
package io.ray.streaming.operator.chain;

import com.google.common.base.Preconditions;
import io.ray.streaming.api.Language;
import io.ray.streaming.api.collector.Collector;
import io.ray.streaming.api.context.RuntimeContext;
import io.ray.streaming.api.function.Function;
import io.ray.streaming.api.function.impl.SourceFunction;
import io.ray.streaming.message.Record;
import io.ray.streaming.operator.OneInputOperator;
import io.ray.streaming.operator.Operator;
import io.ray.streaming.operator.OperatorType;
import io.ray.streaming.operator.SourceOperator;
import io.ray.streaming.operator.StreamOperator;
import io.ray.streaming.operator.TwoInputOperator;
import io.ray.streaming.operator.chain.ForwardCollector;
import java.io.Serializable;
import java.lang.reflect.Proxy;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class ChainedOperator
extends StreamOperator<Function> {
    protected final List<StreamOperator> operators;
    protected final Operator headOperator;
    protected final Operator tailOperator;
    private final List<Map<String, String>> configs;

    public ChainedOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
        Preconditions.checkArgument((operators.size() >= 2 ? 1 : 0) != 0, (Object)"Need at lease two operators to be chained together");
        operators.stream().skip(1L).forEach(operator -> Preconditions.checkArgument((boolean)(operator instanceof OneInputOperator)));
        this.operators = operators;
        this.configs = configs;
        this.headOperator = operators.get(0);
        this.tailOperator = operators.get(operators.size() - 1);
    }

    @Override
    public void open(List<Collector> collectorList, RuntimeContext runtimeContext) {
        List succeedingCollectors = this.operators.stream().skip(1L).map(operator -> new ForwardCollector((OneInputOperator)((Object)operator))).collect(Collectors.toList());
        for (int i = 0; i < this.operators.size() - 1; ++i) {
            StreamOperator operator2 = this.operators.get(i);
            List<Collector> forwardCollectors = Collections.singletonList(succeedingCollectors.get(i));
            operator2.open(forwardCollectors, this.createRuntimeContext(runtimeContext, i));
        }
        this.tailOperator.open(collectorList, this.createRuntimeContext(runtimeContext, this.operators.size() - 1));
    }

    @Override
    public OperatorType getOpType() {
        return this.headOperator.getOpType();
    }

    @Override
    public Language getLanguage() {
        return this.headOperator.getLanguage();
    }

    @Override
    public String getName() {
        return this.operators.stream().map(Operator::getName).collect(Collectors.joining(" -> ", "[", "]"));
    }

    public List<StreamOperator> getOperators() {
        return this.operators;
    }

    public Operator getHeadOperator() {
        return this.headOperator;
    }

    public Operator getTailOperator() {
        return this.tailOperator;
    }

    @Override
    public Serializable saveCheckpoint() {
        Object[] checkpoints = new Object[this.operators.size()];
        for (int i = 0; i < this.operators.size(); ++i) {
            checkpoints[i] = this.operators.get(i).saveCheckpoint();
        }
        return checkpoints;
    }

    @Override
    public void loadCheckpoint(Serializable checkpointObject) {
        Serializable[] checkpoints = (Serializable[])checkpointObject;
        for (int i = 0; i < this.operators.size(); ++i) {
            this.operators.get(i).loadCheckpoint(checkpoints[i]);
        }
    }

    private RuntimeContext createRuntimeContext(RuntimeContext runtimeContext, int index) {
        return (RuntimeContext)Proxy.newProxyInstance(runtimeContext.getClass().getClassLoader(), new Class[]{RuntimeContext.class}, (proxy, method, methodArgs) -> {
            if (method.getName().equals("getConfig")) {
                return this.configs.get(index);
            }
            return method.invoke((Object)runtimeContext, methodArgs);
        });
    }

    public static ChainedOperator newChainedOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
        switch (operators.get(0).getOpType()) {
            case SOURCE: {
                return new ChainedSourceOperator(operators, configs);
            }
            case ONE_INPUT: {
                return new ChainedOneInputOperator(operators, configs);
            }
            case TWO_INPUT: {
                return new ChainedTwoInputOperator(operators, configs);
            }
        }
        throw new IllegalArgumentException("Unsupported operator type " + (Object)((Object)operators.get(0).getOpType()));
    }

    static class ChainedTwoInputOperator<L, R>
    extends ChainedOperator
    implements TwoInputOperator<L, R> {
        private final TwoInputOperator<L, R> inputOperator;

        ChainedTwoInputOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
            super(operators, configs);
            this.inputOperator = (TwoInputOperator)this.headOperator;
        }

        @Override
        public void processElement(Record<L> record1, Record<R> record2) {
            this.inputOperator.processElement(record1, record2);
        }
    }

    static class ChainedOneInputOperator<T>
    extends ChainedOperator
    implements OneInputOperator<T> {
        private final OneInputOperator<T> inputOperator;

        ChainedOneInputOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
            super(operators, configs);
            this.inputOperator = (OneInputOperator)this.headOperator;
        }

        @Override
        public void processElement(Record<T> record) throws Exception {
            this.inputOperator.processElement(record);
        }
    }

    static class ChainedSourceOperator<T>
    extends ChainedOperator
    implements SourceOperator<T> {
        private final SourceOperator<T> sourceOperator;

        ChainedSourceOperator(List<StreamOperator> operators, List<Map<String, String>> configs) {
            super(operators, configs);
            this.sourceOperator = (SourceOperator)this.headOperator;
        }

        @Override
        public void fetch() {
            this.sourceOperator.fetch();
        }

        @Override
        public SourceFunction.SourceContext<T> getSourceContext() {
            return this.sourceOperator.getSourceContext();
        }
    }
}

