/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.operator;

import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.util.OperatorUtil;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.Schema;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.io.Source;
import eu.stratosphere.sopremo.operator.ElementaryOperator;
import eu.stratosphere.sopremo.operator.Operator;
import eu.stratosphere.sopremo.operator.OperatorNavigator;
import eu.stratosphere.sopremo.operator.SopremoModule;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.util.IdentityList;
import eu.stratosphere.util.dag.GraphTraverseListener;
import eu.stratosphere.util.dag.OneTimeTraverser;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

public class ElementarySopremoModule
extends SopremoModule {
    public ElementarySopremoModule(int numberOfInputs, int numberOfOutputs) {
        super(numberOfInputs, numberOfOutputs);
    }

    ElementarySopremoModule() {
    }

    @Override
    public ElementarySopremoModule asElementary() {
        return this;
    }

    public PactModule asPactModule(EvaluationContext context, SopremoRecordLayout layout) {
        return PactModule.valueOf(this.assemblePact(context, layout));
    }

    public Collection<eu.stratosphere.api.common.operators.Operator> assemblePact(EvaluationContext context, SopremoRecordLayout layout) {
        return new PactAssembler(context).assemble(layout);
    }

    @Override
    public ElementarySopremoModule clone() {
        ElementarySopremoModule module = new ElementarySopremoModule(this.getNumInputs(), this.getNumOutputs());
        module.copyPropertiesFrom(this);
        return module;
    }

    public Iterable<? extends ElementaryOperator<?>> getReachableNodes() {
        return super.getReachableNodes();
    }

    public Schema getSchema() {
        HashSet<EvaluationExpression> keyExpressions = new HashSet<EvaluationExpression>();
        for (ElementaryOperator<?> operator : this.getReachableNodes()) {
            keyExpressions.addAll(operator.getAllKeyExpressions());
        }
        return new Schema(new ArrayList<EvaluationExpression>(keyExpressions));
    }

    public static ElementarySopremoModule valueOf(Collection<? extends Operator<?>> sinks) {
        List<Operator<?>> inputs = ElementarySopremoModule.findInputs(sinks);
        ElementarySopremoModule module = new ElementarySopremoModule(inputs.size(), sinks.size());
        ElementarySopremoModule.connectOutputs(module, sinks);
        ElementarySopremoModule.connectInputs(module, inputs);
        return module;
    }

    public static ElementarySopremoModule valueOf(Operator<?> ... sinks) {
        return ElementarySopremoModule.valueOf(Arrays.asList(sinks));
    }

    private class PactAssembler {
        private final Map<Operator<?>, PactModule> modules = new IdentityHashMap();
        private final Map<Operator<?>, List<List<eu.stratosphere.api.common.operators.Operator>>> operatorOutputs = new IdentityHashMap();
        private final EvaluationContext context;

        public PactAssembler(EvaluationContext context) {
            this.context = context;
        }

        public Collection<eu.stratosphere.api.common.operators.Operator> assemble(SopremoRecordLayout layout) {
            this.convertDAGToModules(layout);
            this.connectModules();
            List<eu.stratosphere.api.common.operators.Operator> pactSinks = this.findPACTSinks();
            return pactSinks;
        }

        private void addOutputtingPactInOperator(Operator<?> operator, eu.stratosphere.api.common.operators.Operator o, List<eu.stratosphere.api.common.operators.Operator> connectedInputs) {
            int inputIndex = new IdentityList((Collection)this.modules.get(operator).getInputs()).indexOf((Object)o);
            if (inputIndex >= operator.getInputs().size() || inputIndex == -1) {
                connectedInputs.add(o);
                return;
            }
            Operator.Output inputSource = operator.getInputs().get(inputIndex).getSource();
            List<eu.stratosphere.api.common.operators.Operator> outputtingOperators = this.operatorOutputs.get(inputSource.getOperator()).get(inputSource.getIndex());
            for (eu.stratosphere.api.common.operators.Operator outputtingOperator : outputtingOperators) {
                if (outputtingOperator instanceof FileDataSource && !(inputSource.getOperator() instanceof Source)) {
                    this.addOutputtingPactInOperator(inputSource.getOperator(), outputtingOperator, connectedInputs);
                    continue;
                }
                connectedInputs.add(outputtingOperator);
            }
        }

        private void connectModules() {
            for (Map.Entry<Operator<?>, PactModule> operatorModule : this.modules.entrySet()) {
                Operator<?> operator = operatorModule.getKey();
                PactModule module = operatorModule.getValue();
                for (eu.stratosphere.api.common.operators.Operator contract : module.getReachableNodes()) {
                    List inputLists = OperatorUtil.getInputs((eu.stratosphere.api.common.operators.Operator)contract);
                    for (int listIndex = 0; listIndex < inputLists.size(); ++listIndex) {
                        ArrayList<eu.stratosphere.api.common.operators.Operator> connectedInputs = new ArrayList<eu.stratosphere.api.common.operators.Operator>();
                        List inputs = (List)inputLists.get(listIndex);
                        for (int inputIndex = 0; inputIndex < inputs.size(); ++inputIndex) {
                            this.addOutputtingPactInOperator(operator, (eu.stratosphere.api.common.operators.Operator)inputs.get(inputIndex), connectedInputs);
                        }
                        inputLists.set(listIndex, connectedInputs);
                    }
                    OperatorUtil.setInputs((eu.stratosphere.api.common.operators.Operator)contract, (List)inputLists);
                }
            }
        }

        private void convertDAGToModules(final SopremoRecordLayout layout) {
            OneTimeTraverser.INSTANCE.traverse((Iterable)ElementarySopremoModule.this.getAllOutputs(), OperatorNavigator.ELEMENTARY, new GraphTraverseListener<ElementaryOperator<?>>(){

                public void nodeTraversed(ElementaryOperator<?> node) {
                    EvaluationContext context = PactAssembler.this.context;
                    context.setOperatorDescription(node.getName());
                    PactModule module = node.asPactModule(context, layout);
                    PactAssembler.this.modules.put(node, module);
                    List outputFunctions = module.getOutputs();
                    ArrayList<List> outputOperators = new ArrayList<List>();
                    for (GenericDataSink sink : outputFunctions) {
                        outputOperators.add(sink.getInputs());
                    }
                    PactAssembler.this.operatorOutputs.put(node, outputOperators);
                }
            });
            for (PactModule module : this.modules.values()) {
                module.validate();
            }
        }

        private List<eu.stratosphere.api.common.operators.Operator> findPACTSinks() {
            ArrayList<eu.stratosphere.api.common.operators.Operator> pactSinks = new ArrayList<eu.stratosphere.api.common.operators.Operator>();
            for (Operator sink : ElementarySopremoModule.this.getAllOutputs()) {
                for (GenericDataSink outputFunction : this.modules.get(sink).getAllOutputs()) {
                    if (sink instanceof Sink) {
                        pactSinks.add((eu.stratosphere.api.common.operators.Operator)outputFunction);
                        continue;
                    }
                    pactSinks.addAll(outputFunction.getInputs());
                }
            }
            return pactSinks;
        }
    }
}

