/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.testing;

import eu.stratosphere.api.common.Plan;
import eu.stratosphere.api.common.operators.FileDataSink;
import eu.stratosphere.api.common.operators.FileDataSource;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.GenericDataSource;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.core.fs.FSDataInputStream;
import eu.stratosphere.core.fs.FileSystem;
import eu.stratosphere.core.fs.Path;
import eu.stratosphere.core.testing.AssertUtil;
import eu.stratosphere.core.testing.GenericTestPlan;
import eu.stratosphere.core.testing.GenericTestRecords;
import eu.stratosphere.core.testing.TypeConfig;
import eu.stratosphere.pact.common.plan.PactModule;
import eu.stratosphere.sopremo.EvaluationContext;
import eu.stratosphere.sopremo.expressions.EvaluationExpression;
import eu.stratosphere.sopremo.io.JsonFormat;
import eu.stratosphere.sopremo.io.JsonParser;
import eu.stratosphere.sopremo.io.Sink;
import eu.stratosphere.sopremo.io.SopremoFormat;
import eu.stratosphere.sopremo.io.Source;
import eu.stratosphere.sopremo.operator.JsonStream;
import eu.stratosphere.sopremo.operator.Operator;
import eu.stratosphere.sopremo.operator.OperatorNavigator;
import eu.stratosphere.sopremo.operator.PlanWithSopremoPostPass;
import eu.stratosphere.sopremo.operator.SopremoModule;
import eu.stratosphere.sopremo.operator.SopremoPlan;
import eu.stratosphere.sopremo.pact.SopremoUtil;
import eu.stratosphere.sopremo.pact.UntypedRecordToJsonIterator;
import eu.stratosphere.sopremo.serialization.SopremoRecord;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.sopremo.testing.SopremoTestRecords;
import eu.stratosphere.sopremo.type.IJsonNode;
import eu.stratosphere.sopremo.type.JsonUtil;
import eu.stratosphere.util.AbstractIterator;
import eu.stratosphere.util.IteratorUtil;
import eu.stratosphere.util.dag.ConnectionNavigator;
import eu.stratosphere.util.dag.OneTimeTraverser;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

public class SopremoTestPlan {
    private Input[] inputs;
    private ActualOutput[] actualOutputs;
    private ExpectedOutput[] expectedOutputs;
    private SopremoRecordTestPlan testPlan;
    private final EvaluationContext evaluationContext = new EvaluationContext();
    private boolean trace;
    private int dop = -1;

    public SopremoTestPlan(int numInputs, int numOutputs) {
        this.initInputsAndOutputs(numInputs, numOutputs);
    }

    public SopremoTestPlan(List<Operator<?>> sinks) {
        int index;
        ArrayList unconnectedOutputs = new ArrayList();
        ArrayList<Operator> unconnectedInputs = new ArrayList<Operator>();
        for (Operator<?> operator : sinks) {
            unconnectedOutputs.addAll(operator.getOutputs());
            if (operator.getNumOutputs() != 0) continue;
            unconnectedOutputs.add(operator);
        }
        for (Operator operator : OneTimeTraverser.INSTANCE.getReachableNodes(sinks, (ConnectionNavigator)OperatorNavigator.INSTANCE)) {
            if (operator instanceof Source) {
                unconnectedInputs.add(operator);
                continue;
            }
            for (JsonStream input : operator.getInputs()) {
                if (input != null) continue;
                unconnectedInputs.add(operator);
            }
        }
        this.inputs = new Input[unconnectedInputs.size()];
        for (index = 0; index < this.inputs.length; ++index) {
            this.inputs[index] = new Input(this, index);
            Operator operator = (Operator)unconnectedInputs.get(index);
            if (operator instanceof Source) {
                this.setInputOperator(index, (Source)operator);
                continue;
            }
            ArrayList<JsonStream> missingInputs = new ArrayList<JsonStream>(operator.getInputs());
            for (int missingIndex = 0; missingIndex < missingInputs.size(); ++missingIndex) {
                if (missingInputs.get(missingIndex) != null) continue;
                missingInputs.set(missingIndex, ((Source)this.inputs[index].getOperator()).getOutput(0));
                break;
            }
            operator.setInputs(missingInputs);
        }
        this.actualOutputs = new ActualOutput[unconnectedOutputs.size()];
        this.expectedOutputs = new ExpectedOutput[unconnectedOutputs.size()];
        for (index = 0; index < this.actualOutputs.length; ++index) {
            this.actualOutputs[index] = new ActualOutput(index);
            if (unconnectedOutputs.get(index) instanceof Sink) {
                this.actualOutputs[index].setOperator((Sink)unconnectedOutputs.get(index));
            } else {
                ((Sink)this.actualOutputs[index].getOperator()).setInput(0, (JsonStream)unconnectedOutputs.get(index));
            }
            this.expectedOutputs[index] = new ExpectedOutput(this, index);
        }
    }

    public SopremoTestPlan(Operator<?> ... sinks) {
        this(Arrays.asList(sinks));
    }

    SopremoTestPlan() {
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null) {
            return false;
        }
        if (this.getClass() != obj.getClass()) {
            return false;
        }
        SopremoTestPlan other = (SopremoTestPlan)obj;
        return Arrays.equals(this.inputs, other.inputs) && Arrays.equals(this.expectedOutputs, other.expectedOutputs) && this.getSopremoPlan().equals((Object)other.getSopremoPlan());
    }

    public ActualOutput getActualOutput(int index) {
        return this.actualOutputs[index];
    }

    public ActualOutput getActualOutputForStream(JsonStream stream) {
        for (ActualOutput output : this.actualOutputs) {
            if (((Sink)output.getOperator()).getInput(0) != stream.getSource()) continue;
            return output;
        }
        return null;
    }

    public EvaluationContext getCompilationContext() {
        return this.evaluationContext;
    }

    public ExpectedOutput getExpectedOutput(int index) {
        return this.expectedOutputs[index];
    }

    public ExpectedOutput getExpectedOutputForStream(JsonStream stream) {
        return this.expectedOutputs[this.getActualOutputForStream(stream).getIndex()];
    }

    public Input getInput(int index) {
        return this.inputs[index];
    }

    public Input getInputForStream(JsonStream stream) {
        for (Input input : this.inputs) {
            if (((Source)input.getOperator()).getOutput(0) != stream.getSource()) continue;
            return input;
        }
        return null;
    }

    public Source getInputOperator(int index) {
        return (Source)this.getInput(index).getOperator();
    }

    public Source[] getInputOperators(int from, int to) {
        Source[] operators = new Source[to - from];
        for (int index = 0; index < operators.length; ++index) {
            operators[index] = this.getInputOperator(from + index);
        }
        return operators;
    }

    public Sink getOutputOperator(int index) {
        return (Sink)this.getActualOutput(index).getOperator();
    }

    public Sink[] getOutputOperators(int from, int to) {
        Sink[] operators = new Sink[to - from];
        for (int index = 0; index < operators.length; ++index) {
            operators[index] = this.getOutputOperator(from + index);
        }
        return operators;
    }

    public int hashCode() {
        int prime = 31;
        int result = 1;
        result = 31 * result + Arrays.hashCode(this.inputs);
        result = 31 * result + Arrays.hashCode(this.actualOutputs);
        result = 31 * result + Arrays.hashCode(this.expectedOutputs);
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        SopremoPlan sopremoPlan = this.getSopremoPlan();
        Collection sinks = sopremoPlan.assemblePact();
        SopremoRecordLayout layout = sopremoPlan.getLayout();
        this.testPlan = new SopremoRecordTestPlan(layout, sinks);
        for (Input input : this.inputs) {
            input.prepare(this.testPlan, layout);
        }
        for (ModifiableChannel modifiableChannel : this.expectedOutputs) {
            modifiableChannel.prepare(this.testPlan, layout);
        }
        if (this.dop > 0) {
            this.testPlan.setDegreeOfParallelism(this.dop);
        }
        if (this.trace) {
            SopremoUtil.trace();
        }
        try {
            this.testPlan.run();
        }
        finally {
            if (this.trace) {
                SopremoUtil.untrace();
            }
        }
        for (InternalChannel internalChannel : this.actualOutputs) {
            ((ActualOutput)internalChannel).load(this.testPlan);
        }
    }

    public void setDegreeOfParallelism(int dop) {
        if (dop < 1) {
            throw new IllegalArgumentException("Degree of parallelism must be greater than 0!");
        }
        this.dop = dop;
    }

    public void setInputOperator(int index, Source operator) {
        this.inputs[index].setOperator(operator);
    }

    public void setOutputOperator(int index, Sink operator) {
        this.actualOutputs[index].setOperator(operator);
    }

    public String toString() {
        return SopremoModule.valueOf((Operator[])this.getOutputOperators(0, this.actualOutputs.length)).toString();
    }

    public void trace() {
        this.trace = true;
    }

    protected void initInputsAndOutputs(int numInputs, int numOutputs) {
        int index;
        this.inputs = new Input[numInputs];
        for (index = 0; index < numInputs; ++index) {
            this.inputs[index] = new Input(this, index);
        }
        this.expectedOutputs = new ExpectedOutput[numOutputs];
        for (index = 0; index < numOutputs; ++index) {
            this.expectedOutputs[index] = new ExpectedOutput(this, index);
        }
        this.actualOutputs = new ActualOutput[numOutputs];
        for (index = 0; index < numOutputs; ++index) {
            this.actualOutputs[index] = new ActualOutput(index);
        }
    }

    private SopremoPlan getSopremoPlan() {
        SopremoPlan sopremoPlan = new SopremoPlan();
        sopremoPlan.setContext(this.evaluationContext);
        sopremoPlan.setSinks(this.getOutputOperators(0, this.expectedOutputs.length));
        return sopremoPlan;
    }

    static abstract class ModifiableChannel<O extends Operator<?>, C extends ModifiableChannel<O, C>>
    extends InternalChannel<O, C>
    implements Iterable<IJsonNode> {
        private final List<IJsonNode> values = new ArrayList<IJsonNode>();
        protected SopremoTestRecords testRecords;
        private boolean empty = false;
        private final transient SopremoTestPlan testPlan;

        public ModifiableChannel(SopremoTestPlan testPlan, O operator, int index) {
            super(operator, index);
            this.testPlan = testPlan;
        }

        ModifiableChannel() {
            this.testPlan = null;
        }

        public C add(IJsonNode value) {
            this.empty = false;
            this.file = null;
            this.values.add(value);
            return (C)this;
        }

        public C addArray(Object ... values) {
            return this.add((IJsonNode)JsonUtil.createArrayNode((Object[])values));
        }

        public C addObject(Object ... fields) {
            return this.add((IJsonNode)JsonUtil.createObjectNode((Object[])fields));
        }

        public C addValue(Object value) {
            return this.add(JsonUtil.createValueNode((Object)value));
        }

        public boolean isEmpty() {
            return this.empty;
        }

        @Override
        public Iterator<IJsonNode> iterator() {
            if (this.isEmpty()) {
                return Collections.EMPTY_LIST.iterator();
            }
            if (this.testRecords != null) {
                UntypedRecordToJsonIterator recordToJsonIterator = new UntypedRecordToJsonIterator();
                recordToJsonIterator.setIterator(this.testRecords.iterator());
                return recordToJsonIterator;
            }
            if (this.file != null) {
                return this.iteratorFromFile(this.file);
            }
            return this.values.iterator();
        }

        public void load(String file) throws IOException {
            try {
                if (!FileSystem.get((URI)new URI(file)).exists(new Path(file))) {
                    throw new FileNotFoundException();
                }
            }
            catch (URISyntaxException e) {
                throw new IllegalArgumentException(String.format("File %s is not a valid URI", file));
            }
            this.empty = false;
            this.values.clear();
            this.file = file;
        }

        public C setEmpty() {
            this.empty = true;
            this.file = null;
            return (C)this;
        }

        protected EvaluationContext getContext() {
            return this.testPlan.getCompilationContext();
        }

        protected Iterator<IJsonNode> iteratorFromFile(final String file) {
            try {
                FSDataInputStream stream = FileSystem.get((URI)new URI(file)).open(new Path(file));
                final JsonParser parser = new JsonParser(stream);
                parser.setWrappingArraySkipping(true);
                return new AbstractIterator<IJsonNode>(){

                    protected IJsonNode loadNext() {
                        if (parser.checkEnd()) {
                            return (IJsonNode)this.noMoreElements();
                        }
                        try {
                            return parser.readValueAsTree();
                        }
                        catch (IOException e) {
                            throw new IllegalStateException(String.format("Cannot parse json file %s", file), e);
                        }
                    }
                };
            }
            catch (IOException e) {
                throw new IllegalStateException(String.format("Cannot open json file %s", this.file), e);
            }
            catch (URISyntaxException e) {
                throw new IllegalStateException();
            }
        }

        abstract SopremoTestRecords getTestRecords(SopremoRecordTestPlan var1, SopremoRecordLayout var2);

        void prepare(SopremoRecordTestPlan testPlan, SopremoRecordLayout layout) {
            this.testRecords = this.getTestRecords(testPlan, layout);
            if (this.operator instanceof MockupSource) {
                if (this.isEmpty()) {
                    this.testRecords.setEmpty();
                } else if (this.file != null) {
                    Configuration configuration = new Configuration();
                    SopremoUtil.setEvaluationContext((Configuration)configuration, (EvaluationContext)this.getContext().clone());
                    SopremoUtil.transferFieldsToConfiguration((Object)new JsonFormat(), SopremoFormat.class, (Configuration)configuration, JsonFormat.JsonInputFormat.class, SopremoFormat.SopremoFileInputFormat.class);
                    this.testRecords.load(JsonFormat.JsonInputFormat.class, this.file, configuration);
                } else {
                    for (IJsonNode node : this.values) {
                        SopremoRecord record = new SopremoRecord();
                        record.setNode(node);
                        this.testRecords.add(new SopremoRecord[]{record});
                    }
                }
            }
        }
    }

    static abstract class InternalChannel<O extends Operator<?>, C extends InternalChannel<O, C>>
    implements Iterable<IJsonNode> {
        protected String file;
        protected O operator;
        private final int index;

        public InternalChannel(O operator, int index) {
            this.operator = operator;
            this.index = index;
        }

        InternalChannel() {
            this.index = 0;
        }

        public void assertEquals(ActualOutput expectedValues) {
            AssertUtil.assertIteratorEquals(this.iterator(), expectedValues.iterator());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (!(obj instanceof InternalChannel)) {
                return false;
            }
            InternalChannel other = (InternalChannel)obj;
            return IteratorUtil.equal(this.iterator(), other.iterator());
        }

        public List<IJsonNode> getAllNodes() {
            ArrayList<IJsonNode> list = new ArrayList<IJsonNode>();
            Iterator iterator = this.iterator();
            while (iterator.hasNext()) {
                list.add(((IJsonNode)iterator.next()).clone());
            }
            return list;
        }

        public int hashCode() {
            return IteratorUtil.hashCode(this.iterator());
        }

        public String toString() {
            return IteratorUtil.toString(this.iterator(), (int)10);
        }

        int getIndex() {
            return this.index;
        }

        O getOperator() {
            return this.operator;
        }

        void setOperator(O operator) {
            if (operator == null) {
                throw new NullPointerException("operator must not be null");
            }
            this.operator = operator;
        }
    }

    public class SopremoRecordTestPlan
    extends GenericTestPlan<SopremoRecord, SopremoTestRecords> {
        private final SopremoRecordLayout layout;

        protected SopremoRecordTestPlan(SopremoRecordLayout layout, Collection<? extends eu.stratosphere.api.common.operators.Operator> contracts) {
            super(SopremoTestRecords.getTypeConfig(SopremoRecordLayout.create((EvaluationExpression[])new EvaluationExpression[0])), contracts);
            this.layout = layout;
        }

        public String toString() {
            return PactModule.valueOf((Collection)this.getSinks()).toString();
        }

        protected Plan createPlan(Collection<GenericDataSink> wrappedSinks) {
            return new PlanWithSopremoPostPass(this.layout, wrappedSinks);
        }

        protected SopremoTestRecords createTestRecords(TypeConfig<SopremoRecord> typeConfig) {
            return new SopremoTestRecords(typeConfig);
        }
    }

    public static class MockupSource
    extends Source {
        private final int index;

        public MockupSource(int index) {
            super("file:///" + index);
            this.setName("Mockup-input " + index);
            this.index = index;
        }

        MockupSource() {
            super("file:///");
            this.index = 0;
        }

        public PactModule asPactModule(EvaluationContext context, SopremoRecordLayout layout) {
            PactModule pactModule = new PactModule(0, 1);
            FileDataSource contract = GenericTestPlan.createDefaultSource((String)this.getInputPath(), null);
            ((GenericDataSink)pactModule.getOutput(0)).setInput((eu.stratosphere.api.common.operators.Operator)contract);
            SopremoUtil.setEvaluationContext((Configuration)contract.getParameters(), (EvaluationContext)context);
            return pactModule;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!super.equals(obj)) {
                return false;
            }
            if (((Object)((Object)this)).getClass() != obj.getClass()) {
                return false;
            }
            MockupSource other = (MockupSource)((Object)obj);
            return this.index == other.index;
        }

        public int hashCode() {
            int prime = 31;
            int result = super.hashCode();
            result = 31 * result + this.index;
            return result;
        }

        public String toString() {
            return String.format("MockupSource [%s]", this.index);
        }
    }

    public static class MockupSink
    extends Sink {
        private final int index;

        public MockupSink(int index) {
            super("file:///" + index);
            this.setName("Mockup Output" + index);
            this.index = index;
        }

        MockupSink() {
            super("file:///");
            this.index = 0;
        }

        public PactModule asPactModule(EvaluationContext context, SopremoRecordLayout layout) {
            PactModule pactModule = new PactModule(1, 0);
            FileDataSink contract = GenericTestPlan.createDefaultSink((String)this.getOutputPath(), null);
            contract.setInput((eu.stratosphere.api.common.operators.Operator)pactModule.getInput(0));
            pactModule.addInternalOutput((Object)contract);
            SopremoUtil.setEvaluationContext((Configuration)contract.getParameters(), (EvaluationContext)context);
            return pactModule;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!super.equals(obj)) {
                return false;
            }
            if (((Object)((Object)this)).getClass() != obj.getClass()) {
                return false;
            }
            MockupSink other = (MockupSink)((Object)obj);
            return this.index == other.index;
        }

        public int hashCode() {
            int prime = 31;
            int result = super.hashCode();
            result = 31 * result + this.index;
            return result;
        }

        public String toString() {
            return String.format("MockupSink [%s]", this.index);
        }
    }

    public static class Input
    extends ModifiableChannel<Source, Input> {
        public Input(SopremoTestPlan testPlan, int index) {
            super(testPlan, new MockupSource(index), index);
        }

        Input() {
        }

        public int getSourceIndex(GenericTestPlan<SopremoRecord, SopremoTestRecords> testPlan) {
            int sourceIndex = -1;
            List sources = testPlan.getSources();
            for (int index = 0; index < sources.size(); ++index) {
                if (!((GenericDataSource)sources.get(index)).getName().equals(((Source)this.getOperator()).getInputPath())) continue;
                sourceIndex = index;
                break;
            }
            return sourceIndex;
        }

        @Override
        public Iterator<IJsonNode> iterator() {
            if (this.operator != null && !(this.operator instanceof MockupSource)) {
                if (((Source)this.operator).isAdhoc()) {
                    return JsonUtil.asArray((IJsonNode[])new IJsonNode[]{((Source)this.operator).getAdhocValues()}).iterator();
                }
                if (this.testRecords == null) {
                    return this.iteratorFromFile(((Source)this.operator).getInputPath());
                }
            }
            return super.iterator();
        }

        @Override
        SopremoTestRecords getTestRecords(SopremoRecordTestPlan testPlan, SopremoRecordLayout layout) {
            int sourceIndex = this.getSourceIndex(testPlan);
            return (SopremoTestRecords)testPlan.getInput(sourceIndex == -1 ? this.getIndex() : sourceIndex, SopremoTestRecords.getTypeConfig(layout));
        }
    }

    public static class ExpectedOutput
    extends ModifiableChannel<Source, ExpectedOutput> {
        private double doublePrecision;

        public ExpectedOutput(SopremoTestPlan testPlan, int index) {
            super(testPlan, new MockupSource(index), index);
        }

        ExpectedOutput() {
        }

        public double getDoublePrecision() {
            return this.doublePrecision;
        }

        public ExpectedOutput setDoublePrecision(double doublePrecision) {
            this.doublePrecision = doublePrecision;
            throw new UnsupportedOperationException("Currently unsupported; please add ticket when needed");
        }

        @Override
        SopremoTestRecords getTestRecords(SopremoRecordTestPlan testPlan, SopremoRecordLayout layout) {
            int sinkIndex = this.findSinkIndex(testPlan);
            return (SopremoTestRecords)testPlan.getExpectedOutput(sinkIndex, SopremoTestRecords.getTypeConfig(layout));
        }

        private int findSinkIndex(GenericTestPlan<SopremoRecord, SopremoTestRecords> testPlan) {
            int sinkIndex = -1;
            List sinks = testPlan.getSinks();
            for (int index = 0; index < sinks.size(); ++index) {
                if (!((GenericDataSink)sinks.get(index)).getName().equals(((Source)this.getOperator()).getInputPath())) continue;
                sinkIndex = index;
                break;
            }
            return sinkIndex == -1 ? this.getIndex() : sinkIndex;
        }
    }

    public static class ActualOutput
    extends InternalChannel<Sink, ActualOutput> {
        private GenericTestRecords<SopremoRecord> actualRecords;

        public ActualOutput(int index) {
            super(new MockupSink(index), index);
        }

        ActualOutput() {
        }

        @Override
        public Iterator<IJsonNode> iterator() {
            if (this.actualRecords == null) {
                throw new IllegalStateException("Can only access actual output after a complete test run");
            }
            UntypedRecordToJsonIterator iterator = new UntypedRecordToJsonIterator();
            iterator.setIterator(this.actualRecords.iterator());
            return iterator;
        }

        public Iterator<IJsonNode> unsortedIterator() {
            if (this.actualRecords == null) {
                throw new IllegalStateException("Can only access actual output after a complete test run");
            }
            UntypedRecordToJsonIterator iterator = new UntypedRecordToJsonIterator();
            iterator.setIterator(this.actualRecords.unsortedIterator());
            return iterator;
        }

        void load(GenericTestPlan<SopremoRecord, SopremoTestRecords> testPlan) {
            this.actualRecords = testPlan.getActualOutput(this.getIndex());
        }
    }
}

