/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.operator;

import eu.stratosphere.sopremo.ISopremoType;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.io.Source;
import eu.stratosphere.sopremo.operator.CompositeOperator;
import eu.stratosphere.sopremo.operator.ElementarySopremoModule;
import eu.stratosphere.sopremo.operator.JsonStream;
import eu.stratosphere.sopremo.operator.Operator;
import eu.stratosphere.sopremo.operator.OperatorNavigator;
import eu.stratosphere.util.dag.GraphModule;
import eu.stratosphere.util.dag.GraphPrinter;
import eu.stratosphere.util.dag.GraphTraverseListener;
import eu.stratosphere.util.dag.OneTimeTraverser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;

public class SopremoModule
extends GraphModule<Operator<?>, Source, Sink>
implements ISopremoType {
    public SopremoModule(int numberOfInputs, int numberOfOutputs) {
        super(numberOfInputs, numberOfOutputs, OperatorNavigator.INSTANCE);
        int index;
        for (index = 0; index < numberOfInputs; ++index) {
            this.setInput(index, new Source(String.format("file:///%d", index)).withName("Source " + index));
        }
        for (index = 0; index < numberOfOutputs; ++index) {
            this.setOutput(index, new Sink(String.format("file:///%d", index)).withName("Sink " + index));
        }
    }

    protected SopremoModule() {
    }

    public void appendAsString(Appendable appendable) throws IOException {
        GraphPrinter graphPrinter = new GraphPrinter();
        graphPrinter.setWidth(80);
        graphPrinter.print(appendable, (Iterable)this.getAllOutputs(), OperatorNavigator.INSTANCE);
    }

    public ElementarySopremoModule asElementary() {
        return new ElementaryAssembler().assemble(this);
    }

    public Operator<?> asOperator() {
        return new ModuleOperator(this.getInputs(), this.getOutputs());
    }

    public SopremoModule clone() {
        SopremoModule module = new SopremoModule(this.getNumInputs(), this.getNumOutputs());
        module.copyPropertiesFrom(module);
        return module;
    }

    public void embed(Collection<? extends Operator<?>> sinks) {
        List<Operator<?>> inputs = SopremoModule.findInputs(sinks);
        if (inputs.size() != this.getNumInputs()) {
            throw new IllegalArgumentException(String.format("Expected %d instead of %d inputs", this.getNumInputs(), inputs.size()));
        }
        SopremoModule.connectOutputs(this, sinks);
        SopremoModule.connectInputs(this, inputs);
    }

    public void embed(Operator<?> ... sinks) {
        this.embed(Arrays.asList(sinks));
    }

    public String toString() {
        GraphPrinter graphPrinter = new GraphPrinter();
        graphPrinter.setWidth(40);
        return graphPrinter.toString((Iterable)this.getAllOutputs(), OperatorNavigator.INSTANCE);
    }

    protected void copyPropertiesFrom(ISopremoType original) {
        int index;
        SopremoModule module = (SopremoModule)original;
        this.setName(module.getName());
        for (index = 0; index < module.getNumInputs(); ++index) {
            this.setInput(index, module.getInput(index));
        }
        for (index = 0; index < this.getNumOutputs(); ++index) {
            this.setOutput(index, module.getOutput(index));
        }
        for (Sink internal : module.getInternalOutputNodes()) {
            this.addInternalOutput(internal);
        }
    }

    public static SopremoModule valueOf(Collection<? extends Operator<?>> sinks) {
        List<Operator<?>> inputs = SopremoModule.findInputs(sinks);
        SopremoModule module = new SopremoModule(inputs.size(), sinks.size());
        SopremoModule.connectOutputs(module, sinks);
        SopremoModule.connectInputs(module, inputs);
        return module;
    }

    public static SopremoModule valueOf(Operator<?> ... sinks) {
        return SopremoModule.valueOf(Arrays.asList(sinks));
    }

    protected static void connectInputs(SopremoModule module, List<Operator<?>> inputs) {
        int moduleIndex = 0;
        for (int operatorIndex = 0; operatorIndex < inputs.size(); ++operatorIndex) {
            Operator<?> operator = inputs.get(operatorIndex);
            ArrayList<JsonStream> operatorInputs = new ArrayList<JsonStream>(operator.getInputs());
            for (int inputIndex = 0; inputIndex < operatorInputs.size(); ++inputIndex) {
                if (operatorInputs.get(inputIndex) != null) continue;
                operatorInputs.set(inputIndex, ((Source)module.getInput(moduleIndex++)).getOutput(0));
            }
            operator.setInputs(operatorInputs);
        }
    }

    protected static void connectOutputs(SopremoModule module, Collection<? extends Operator<?>> sinks) {
        int sinkIndex = 0;
        for (Operator<?> sink : sinks) {
            if (sink instanceof Sink) {
                module.setOutput(sinkIndex++, (Sink)sink);
                continue;
            }
            for (JsonStream output : sink.getOutputs()) {
                ((Sink)module.getOutput(sinkIndex++)).setInput(0, output);
            }
        }
    }

    protected static List<Operator<?>> findInputs(Collection<? extends Operator<?>> sinks) {
        final ArrayList inputs = new ArrayList();
        OneTimeTraverser.INSTANCE.traverse(sinks, OperatorNavigator.INSTANCE, new GraphTraverseListener<Operator<?>>(){

            public void nodeTraversed(Operator<?> node) {
                if (node instanceof Source) {
                    inputs.add(node);
                } else {
                    for (JsonStream input : node.getInputs()) {
                        if (input != null) continue;
                        inputs.add(node);
                    }
                }
            }
        });
        return inputs;
    }

    private final class ModuleOperator
    extends CompositeOperator<ModuleOperator> {
        public ModuleOperator(List<Source> inputs, List<Sink> outputs) {
            super(inputs.size(), outputs.size());
            this.setInputs(inputs);
            this.setOutputs(outputs);
        }

        @Override
        public void addImplementation(SopremoModule module) {
            module.inputNodes.addAll(SopremoModule.this.inputNodes);
            module.outputNodes.addAll(SopremoModule.this.outputNodes);
            module.internalOutputNodes.addAll(SopremoModule.this.internalOutputNodes);
        }
    }

    private static class ElementaryAssembler {
        private final Map<Operator<?>, ElementarySopremoModule> modules = new IdentityHashMap();

        private ElementaryAssembler() {
        }

        public ElementarySopremoModule assemble(SopremoModule sopremoModule) {
            this.convertDAGToModules(sopremoModule);
            int sinkCount = sopremoModule.getNumOutputs();
            int sourceCount = sopremoModule.getNumInputs();
            ElementarySopremoModule elementarySopremoModule = new ElementarySopremoModule(sourceCount, sinkCount);
            for (int sourceIndex = 0; sourceIndex < sourceCount; ++sourceIndex) {
                ElementarySopremoModule connectedInput = this.modules.get(sopremoModule.getInput(sourceIndex));
                if (connectedInput == null) continue;
                ((Sink)connectedInput.getOutput(0)).setInput(0, (JsonStream)elementarySopremoModule.getInput(sourceIndex));
            }
            this.connectModules();
            for (int sinkIndex = 0; sinkIndex < sinkCount; ++sinkIndex) {
                ((Sink)elementarySopremoModule.getOutput(sinkIndex)).setInput(0, ((Sink)this.modules.get(sopremoModule.getOutput(sinkIndex)).getInternalOutputNodes(0)).getInput(0));
            }
            for (Sink sink : sopremoModule.getInternalOutputNodes()) {
                elementarySopremoModule.addInternalOutput(this.modules.get(sink).getInternalOutputNodes(0));
            }
            if (sopremoModule.getName() != null) {
                elementarySopremoModule.setName(sopremoModule.getName());
            }
            return elementarySopremoModule;
        }

        protected JsonStream traceInput(Operator<?> operator, int index) {
            Operator.Output inputSource = operator.getInput(index).getSource();
            ElementarySopremoModule inputModule = this.modules.get(inputSource.getOperator());
            JsonStream input = ((Sink)inputModule.getOutput(inputSource.getIndex())).getInput(0);
            Operator<?> inputOperator = input.getSource().getOperator();
            if (inputOperator instanceof Source) {
                List inputs = inputModule.getInputs();
                for (int i = 0; i < inputs.size(); ++i) {
                    if (inputOperator != inputs.get(i)) continue;
                    JsonStream inputStream = operator.getInput(index);
                    return this.traceInput(inputStream.getSource().getOperator(), inputStream.getSource().getIndex());
                }
            }
            return input;
        }

        private void connectModules() {
            for (Map.Entry<Operator<?>, ElementarySopremoModule> operatorModule : this.modules.entrySet()) {
                Operator<?> operator = operatorModule.getKey();
                ElementarySopremoModule module = operatorModule.getValue();
                final IdentityHashMap<JsonStream, JsonStream> operatorInputToModuleOutput = new IdentityHashMap<JsonStream, JsonStream>();
                for (int index = 0; index < operator.getInputs().size(); ++index) {
                    JsonStream input = this.traceInput(operator, index);
                    operatorInputToModuleOutput.put(((Source)module.getInput(index)).getOutput(0), input);
                }
                OneTimeTraverser.INSTANCE.traverse((Iterable)module.getAllOutputs(), OperatorNavigator.INSTANCE, new GraphTraverseListener<Operator<?>>(){

                    public void nodeTraversed(Operator<?> innerNode) {
                        List<JsonStream> innerNodeInputs = innerNode.getInputs();
                        for (int index = 0; index < innerNodeInputs.size(); ++index) {
                            JsonStream moduleOutput = (JsonStream)operatorInputToModuleOutput.get(innerNodeInputs.get(index));
                            if (moduleOutput == null) continue;
                            innerNodeInputs.set(index, moduleOutput);
                        }
                        innerNode.setInputs(innerNodeInputs);
                    }
                });
            }
        }

        private void convertDAGToModules(final SopremoModule sopremoModule) {
            OneTimeTraverser.INSTANCE.traverse((Iterable)sopremoModule.getAllOutputs(), OperatorNavigator.INSTANCE, new GraphTraverseListener<Operator<?>>(){

                public void nodeTraversed(Operator<?> node) {
                    if (sopremoModule.getName() != null) {
                        node.setName(sopremoModule.getName() + " - " + node.getName());
                    }
                    ElementarySopremoModule elementaryModule = node.asElementaryOperators();
                    ElementaryAssembler.this.modules.put(node, elementaryModule);
                }
            });
        }
    }
}

