/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.operator;

import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import eu.stratosphere.api.common.operators.BulkIteration;
import eu.stratosphere.api.common.operators.DeltaIteration;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.api.common.operators.util.OperatorUtil;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.io.Source;
import eu.stratosphere.sopremo.operator.ElementaryOperator;
import eu.stratosphere.sopremo.operator.ElementarySopremoModule;
import eu.stratosphere.sopremo.operator.IdentitySetSupplier;
import eu.stratosphere.sopremo.operator.InputCardinality;
import eu.stratosphere.sopremo.operator.JsonStream;
import eu.stratosphere.sopremo.operator.NopOperator;
import eu.stratosphere.sopremo.operator.Operator;
import eu.stratosphere.sopremo.operator.OperatorNavigator;
import eu.stratosphere.sopremo.operator.OutputCardinality;
import eu.stratosphere.sopremo.operator.SopremoModule;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.util.IdentityList;
import eu.stratosphere.util.IdentitySet;
import eu.stratosphere.util.dag.GraphTraverseListener;
import eu.stratosphere.util.dag.OneTimeTraverser;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

public class IterativeSopremoModule
extends SopremoModule {
    private JsonStream terminationCriterion;
    private JsonStream nextWorkset;
    private JsonStream solutionSetDelta;
    private int maxNumberOfIterations = -1;
    private final NopOperator workingSet = (NopOperator)new NopOperator().withName("workingSet");
    private final NopOperator solutionSet = (NopOperator)new NopOperator().withName("solutionSet");
    private List<? extends EvaluationExpression> solutionSetKeyExpressions;

    public IterativeSopremoModule(int numberOfInputs, int numberOfOutputs) {
        super(numberOfInputs, numberOfOutputs);
    }

    IterativeSopremoModule() {
    }

    public void embedInto(SopremoModule module) {
        int index;
        Set<Operator<?>> stepOutputs = this.getStepOutputs();
        Set<Operator<?>> step = this.getStepOperators(stepOutputs);
        IdentityList moduleInputs = new IdentityList(this.getIncomingEdges(step));
        CoreIteration core = CoreIteration.valueOf(this, (List<JsonStream>)moduleInputs);
        core.setInputs((List<JsonStream>)moduleInputs);
        for (index = 0; index < this.getNumOutputs(); ++index) {
            ((Sink)module.getOutput(index)).setInput(0, ((Sink)this.getOutput(index)).getInput(0));
        }
        for (index = 0; index < this.getNumInputs(); ++index) {
            IterativeSopremoModule.replace(this.getReachableNodes(), ((Source)this.getInput(index)).getOutput(0), ((Source)module.getInput(index)).getOutput(0));
            IterativeSopremoModule.replace(Collections.singleton(core), ((Source)this.getInput(index)).getOutput(0), ((Source)module.getInput(index)).getOutput(0));
        }
        IterativeSopremoModule.replace(this.getReachableNodes(), this.solutionSet.getOutput(0), core.getOutput(0));
    }

    public int getMaxNumberOfIterations() {
        return this.maxNumberOfIterations;
    }

    public JsonStream getSolutionSet() {
        return this.solutionSet;
    }

    public JsonStream getTerminationCriterion() {
        return this.terminationCriterion;
    }

    public JsonStream getWorkingSet() {
        return this.workingSet;
    }

    public void setInitialSolutionSet(JsonStream initialSolutionSet) {
        if (initialSolutionSet == null) {
            throw new NullPointerException("initialSolutionSet must not be null");
        }
        this.solutionSet.setInput(0, initialSolutionSet);
    }

    public void setInitialWorkingset(JsonStream initialWorkingset) {
        if (initialWorkingset == null) {
            throw new NullPointerException("initialWorkingset must not be null");
        }
        this.workingSet.setInput(0, initialWorkingset);
    }

    public void setMaxNumberOfIterations(int maxNumberOfIterations) {
        if (maxNumberOfIterations < 1) {
            throw new NullPointerException("maxNumberOfIterations must >= 1");
        }
        this.maxNumberOfIterations = maxNumberOfIterations;
    }

    public void setNextWorkset(JsonStream nextWorkset) {
        if (nextWorkset == null) {
            throw new NullPointerException("nextWorkset must not be null");
        }
        this.nextWorkset = nextWorkset;
    }

    public void setSolutionSetDelta(JsonStream solutionSetDelta) {
        if (solutionSetDelta == null) {
            throw new NullPointerException("solutionSetDelta must not be null");
        }
        this.solutionSetDelta = solutionSetDelta;
    }

    public void setTerminationCriterion(JsonStream terminationCriterion) {
        if (terminationCriterion == null) {
            throw new NullPointerException("terminationCriterion must not be null");
        }
        this.terminationCriterion = terminationCriterion;
    }

    public void validate() {
        if (this.solutionSetDelta == null) {
            throw new IllegalStateException("the module must provide a solution set delta");
        }
        if (this.nextWorkset == null) {
            for (Operator operator : this.getReachableNodes()) {
                if (!operator.getInputs().contains(this.getWorkingSet())) continue;
                throw new IllegalStateException("the module can only use working set, when it also provides a next working set");
            }
        }
        if (this.terminationCriterion == null && this.maxNumberOfIterations == -1) {
            throw new IllegalStateException("must set terminationCriterion and/or maxNumberOfIterations");
        }
        if (this.terminationCriterion != null && this.nextWorkset != null) {
            throw new IllegalStateException("cannot use terminationCriterion and nextWorkset at the same time (iteration must be either bulk or delta)");
        }
        if (this.solutionSetKeyExpressions.isEmpty()) {
            throw new IllegalStateException("solutionSetKeyExpressions must be set");
        }
        super.validate();
    }

    List<? extends EvaluationExpression> getSolutionSetKeyExpressions() {
        return this.solutionSetKeyExpressions;
    }

    void setSolutionSetKeyExpressions(List<? extends EvaluationExpression> solutionSetKeyExpressions) {
        if (solutionSetKeyExpressions == null) {
            throw new NullPointerException("solutionSetKeyExpressions must not be null");
        }
        this.solutionSetKeyExpressions = solutionSetKeyExpressions;
    }

    private Set<JsonStream> getIncomingEdges(Set<Operator<?>> partition) {
        IdentitySet incomingEdges = new IdentitySet();
        for (Operator<?> op : partition) {
            for (JsonStream input : op.getInputs()) {
                if (partition.contains(input.getSource().getOperator())) continue;
                incomingEdges.add(input);
            }
        }
        return incomingEdges;
    }

    private Set<Operator<?>> getStepOperators(Set<Operator<?>> stepOutputs) {
        IdentitySet step = new IdentitySet();
        Multimap<Operator<?>, Operator<?>> successors = this.getSuccessorRelations(stepOutputs);
        step.addAll(successors.get(this.getWorkingSet().getSource().getOperator()));
        step.addAll(successors.get(this.getSolutionSet().getSource().getOperator()));
        step.add(this.workingSet);
        step.add(this.solutionSet);
        return step;
    }

    private Set<Operator<?>> getStepOutputs() {
        IdentitySet stepOutputs = new IdentitySet();
        stepOutputs.add(this.solutionSetDelta.getSource().getOperator());
        if (this.nextWorkset != null) {
            stepOutputs.add(this.nextWorkset.getSource().getOperator());
        }
        if (this.terminationCriterion != null) {
            stepOutputs.add(this.terminationCriterion.getSource().getOperator());
        }
        return stepOutputs;
    }

    private Multimap<Operator<?>, Operator<?>> getSuccessorRelations(Set<Operator<?>> stepOutputs) {
        final Multimap successors = Multimaps.newMultimap(new IdentityHashMap(), IdentitySetSupplier.getInstance());
        OneTimeTraverser.INSTANCE.traverse(stepOutputs, OperatorNavigator.INSTANCE, new GraphTraverseListener<Operator<?>>(){

            public void nodeTraversed(Operator<?> node) {
                for (JsonStream input : node.getInputs()) {
                    successors.put(input.getSource().getOperator(), node);
                    successors.putAll(input.getSource().getOperator(), (Iterable)successors.get(node));
                }
            }
        });
        return successors;
    }

    private static void replace(Iterable<? extends eu.stratosphere.api.common.operators.Operator> nodes, eu.stratosphere.api.common.operators.Operator toReplace, eu.stratosphere.api.common.operators.Operator replaceWith) {
        for (eu.stratosphere.api.common.operators.Operator operator : nodes) {
            List inputs = OperatorUtil.getInputs((eu.stratosphere.api.common.operators.Operator)operator);
            for (List unionedInputs : inputs) {
                for (int index = 0; index < unionedInputs.size(); ++index) {
                    if (unionedInputs.get(index) != toReplace) continue;
                    unionedInputs.set(index, replaceWith);
                }
            }
            OperatorUtil.setInputs((eu.stratosphere.api.common.operators.Operator)operator, (List)inputs);
        }
    }

    private static void replace(Iterable<? extends Operator<?>> nodes, JsonStream toReplace, JsonStream replaceWith) {
        for (Operator<?> operator : nodes) {
            int size = operator.getNumInputs();
            for (int index = 0; index < size; ++index) {
                if (operator.getInput(index) != toReplace) continue;
                operator.setInput(index, replaceWith);
            }
        }
    }

    @InputCardinality(min=1)
    @OutputCardinality(value=1)
    static final class CoreIteration
    extends ElementaryOperator<CoreIteration> {
        private final IterativeSopremoModule module;
        private final List<JsonStream> moduleInputs;
        private final List<JsonStream> stepInputs;
        private final ElementarySopremoModule stepSopremoModule;

        public CoreIteration() {
            this(null, null, null, null);
        }

        public CoreIteration(IterativeSopremoModule module, List<JsonStream> moduleInputs, ElementarySopremoModule stepSopremoModule, List<JsonStream> stepInputs) {
            this.module = module;
            this.moduleInputs = moduleInputs;
            this.stepSopremoModule = stepSopremoModule;
            this.stepInputs = stepInputs;
        }

        @Override
        public PactModule asPactModule(EvaluationContext context, SopremoRecordLayout layout) {
            PactModule iterationModule = new PactModule(this.getNumInputs(), this.getNumOutputs());
            PactModule stepModule = this.stepSopremoModule.asPactModule(context, layout);
            if (this.module.nextWorkset == null) {
                BulkIteration bulkIteration = new BulkIteration();
                bulkIteration.setDegreeOfParallelism(this.getDegreeOfParallelism());
                bulkIteration.setMaximumNumberOfIterations(this.module.maxNumberOfIterations);
                bulkIteration.setNextPartialSolution((eu.stratosphere.api.common.operators.Operator)((GenericDataSink)stepModule.getOutput(0)).getInputs().get(0));
                if (this.module.terminationCriterion != null) {
                    bulkIteration.setTerminationCriterion((eu.stratosphere.api.common.operators.Operator)((GenericDataSink)stepModule.getOutput(1)).getInputs().get(0));
                }
                ((GenericDataSink)iterationModule.getOutput(0)).setInput((eu.stratosphere.api.common.operators.Operator)bulkIteration);
                IterativeSopremoModule.replace(stepModule.getReachableNodes(), (eu.stratosphere.api.common.operators.Operator)stepModule.getInput(this.getInputIndex(stepModule, (Source)this.stepSopremoModule.getInput(0))), bulkIteration.getPartialSolution());
                for (int index = 2; index < this.stepInputs.size(); ++index) {
                    int moduleIndex = this.moduleInputs.indexOf(this.stepInputs.get(index));
                    IterativeSopremoModule.replace(stepModule.getReachableNodes(), (eu.stratosphere.api.common.operators.Operator)stepModule.getInput(this.getInputIndex(stepModule, (Source)this.stepSopremoModule.getInput(index))), (eu.stratosphere.api.common.operators.Operator)iterationModule.getInput(moduleIndex));
                }
            } else {
                DeltaIteration deltaIteration = new DeltaIteration(this.getKeyIndices(layout, this.module.solutionSetKeyExpressions));
                deltaIteration.setDegreeOfParallelism(this.getDegreeOfParallelism());
                deltaIteration.setMaximumNumberOfIterations(this.module.maxNumberOfIterations);
                deltaIteration.setSolutionSetDelta((eu.stratosphere.api.common.operators.Operator)((GenericDataSink)stepModule.getOutput(0)).getInputs().get(0));
                deltaIteration.setNextWorkset((eu.stratosphere.api.common.operators.Operator)((GenericDataSink)stepModule.getOutput(1)).getInputs().get(0));
                ((GenericDataSink)iterationModule.getOutput(0)).setInput((eu.stratosphere.api.common.operators.Operator)deltaIteration);
                IterativeSopremoModule.replace(stepModule.getReachableNodes(), (eu.stratosphere.api.common.operators.Operator)stepModule.getInput(this.getInputIndex(stepModule, (Source)this.stepSopremoModule.getInput(0))), deltaIteration.getSolutionSet());
                IterativeSopremoModule.replace(stepModule.getReachableNodes(), (eu.stratosphere.api.common.operators.Operator)stepModule.getInput(this.getInputIndex(stepModule, (Source)this.stepSopremoModule.getInput(1))), deltaIteration.getWorkset());
                for (int index = 2; index < this.stepInputs.size(); ++index) {
                    int moduleIndex = this.moduleInputs.indexOf(this.stepInputs.get(index));
                    IterativeSopremoModule.replace(stepModule.getReachableNodes(), (eu.stratosphere.api.common.operators.Operator)stepModule.getInput(this.getInputIndex(stepModule, (Source)this.stepSopremoModule.getInput(index))), (eu.stratosphere.api.common.operators.Operator)iterationModule.getInput(moduleIndex));
                }
                deltaIteration.setInitialSolutionSet(new eu.stratosphere.api.common.operators.Operator[]{(eu.stratosphere.api.common.operators.Operator)iterationModule.getInput(this.moduleInputs.indexOf(this.module.solutionSet.getInput(0)))});
                deltaIteration.setInitialWorkset(new eu.stratosphere.api.common.operators.Operator[]{(eu.stratosphere.api.common.operators.Operator)iterationModule.getInput(this.moduleInputs.indexOf(this.module.workingSet.getInput(0)))});
            }
            return iterationModule;
        }

        @Override
        public Set<EvaluationExpression> getAllKeyExpressions() {
            HashSet<EvaluationExpression> keyExpressions = new HashSet<EvaluationExpression>(this.module.solutionSetKeyExpressions);
            keyExpressions.addAll(this.stepSopremoModule.getSchema().getKeyExpressions());
            return keyExpressions;
        }

        private int getInputIndex(PactModule pactModule, Source sopremoSource) {
            for (int index = 0; index < pactModule.getNumInputs(); ++index) {
                if (!((GenericDataSource)pactModule.getInput(index)).getName().equals(sopremoSource.getName())) continue;
                return index;
            }
            throw new IllegalStateException();
        }

        public static CoreIteration valueOf(IterativeSopremoModule module, List<JsonStream> moduleInputs) {
            ElementarySopremoModule stepSopremoModule;
            IdentityList stepInputs = new IdentityList(moduleInputs);
            stepInputs.remove(module.solutionSet.getInput(0));
            stepInputs.remove(module.workingSet.getInput(0));
            if (module.nextWorkset == null) {
                stepInputs.add(0, module.solutionSet.getOutput(0));
                stepSopremoModule = CoreIteration.getBulkStep(module, (Collection<JsonStream>)stepInputs).asElementary();
            } else {
                stepInputs.add(0, module.solutionSet.getOutput(0));
                stepInputs.add(1, module.workingSet.getOutput(0));
                stepSopremoModule = CoreIteration.getDeltaStep(module, (Collection<JsonStream>)stepInputs).asElementary();
            }
            return new CoreIteration(module, moduleInputs, stepSopremoModule, (List<JsonStream>)stepInputs);
        }

        private static SopremoModule getBulkStep(IterativeSopremoModule module, Collection<JsonStream> stepInputs) {
            SopremoModule stepModule;
            if (module.terminationCriterion != null) {
                stepModule = new SopremoModule(stepInputs.size(), 2);
                ((Sink)stepModule.getOutput(1)).setInput(0, module.terminationCriterion);
            } else {
                stepModule = new SopremoModule(stepInputs.size(), 1);
            }
            ((Sink)stepModule.getOutput(0)).setInput(0, module.solutionSetDelta);
            Iterator<JsonStream> iterator = stepInputs.iterator();
            int index = 0;
            while (iterator.hasNext()) {
                IterativeSopremoModule.replace(stepModule.getReachableNodes(), iterator.next(), (JsonStream)stepModule.getInput(index));
                ++index;
            }
            return stepModule;
        }

        private static SopremoModule getDeltaStep(IterativeSopremoModule module, Collection<JsonStream> stepInputs) {
            SopremoModule stepModule = new SopremoModule(stepInputs.size(), 2);
            ((Sink)stepModule.getOutput(0)).setInput(0, module.solutionSetDelta);
            ((Sink)stepModule.getOutput(1)).setInput(0, module.nextWorkset);
            Iterator<JsonStream> iterator = stepInputs.iterator();
            int index = 0;
            while (iterator.hasNext()) {
                IterativeSopremoModule.replace(stepModule.getReachableNodes(), iterator.next(), (JsonStream)stepModule.getInput(index));
                ++index;
            }
            return stepModule;
        }
    }
}

