/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.sopremo.serialization;

import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.SingleInputOperator;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypePairComparatorFactory;
import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.CompilerPostPassException;
import eu.stratosphere.compiler.plan.Channel;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.OptimizedPlan;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
import eu.stratosphere.compiler.postpass.AbstractSchema;
import eu.stratosphere.compiler.postpass.ConflictingFieldTypeInfoException;
import eu.stratosphere.compiler.postpass.GenericFlatTypePostPass;
import eu.stratosphere.compiler.postpass.MissingFieldTypeInfoException;
import eu.stratosphere.sopremo.operator.PlanWithSopremoPostPass;
import eu.stratosphere.sopremo.pact.SopremoCoGroupOperator;
import eu.stratosphere.sopremo.pact.SopremoReduceOperator;
import eu.stratosphere.sopremo.serialization.SopremoRecordComparatorFactory;
import eu.stratosphere.sopremo.serialization.SopremoRecordLayout;
import eu.stratosphere.sopremo.serialization.SopremoRecordPairComparatorFactory;
import eu.stratosphere.sopremo.serialization.SopremoRecordSchema;
import eu.stratosphere.sopremo.serialization.SopremoRecordSerializerFactory;
import eu.stratosphere.sopremo.type.IJsonNode;

public class SopremoRecordPostPass
extends GenericFlatTypePostPass<Class<? extends IJsonNode>, SopremoRecordSchema> {
    private SopremoRecordLayout layout;

    public SopremoRecordPostPass() {
        this.setPropagateParentSchemaDown(false);
    }

    public void postPass(OptimizedPlan plan) {
        this.layout = ((PlanWithSopremoPostPass)plan.getOriginalPactPlan()).getLayout();
        super.postPass(plan);
    }

    protected TypeComparatorFactory<?> createComparator(FieldList fields, boolean[] directions, SopremoRecordSchema schema) {
        return new SopremoRecordComparatorFactory(this.layout, fields.toArray(), directions);
    }

    protected SopremoRecordSchema createEmptySchema() {
        return new SopremoRecordSchema();
    }

    protected TypePairComparatorFactory<?, ?> createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections, SopremoRecordSchema schema1, SopremoRecordSchema schema2) throws MissingFieldTypeInfoException {
        return new SopremoRecordPairComparatorFactory();
    }

    protected TypeSerializerFactory<?> createSerializer(SopremoRecordSchema schema) throws MissingFieldTypeInfoException {
        return new SopremoRecordSerializerFactory(this.layout);
    }

    protected void getDualInputNodeSchema(DualInputPlanNode node, SopremoRecordSchema input1Schema, SopremoRecordSchema input2Schema) {
        int i;
        int[] localPositions2;
        DualInputOperator contract = node.getTwoInputNode().getPactContract();
        int[] localPositions1 = contract.getKeyColumns(0);
        if (localPositions1.length != (localPositions2 = contract.getKeyColumns(1)).length) {
            throw new CompilerException("Error: The keys for the first and second input have a different number of fields.");
        }
        for (i = 0; i < localPositions1.length; ++i) {
            input1Schema.add(localPositions1[i]);
        }
        for (i = 0; i < localPositions2.length; ++i) {
            input2Schema.add(localPositions2[i]);
        }
        if (contract instanceof SopremoCoGroupOperator) {
            Ordering groupOrder1 = ((SopremoCoGroupOperator)contract).getFirstInnerGroupOrdering();
            Ordering groupOrder2 = ((SopremoCoGroupOperator)contract).getSecondInnerGroupOrdering();
            if (groupOrder1 != null) {
                this.addOrderingToSchema(groupOrder1, input1Schema);
            }
            if (groupOrder2 != null) {
                this.addOrderingToSchema(groupOrder2, input2Schema);
            }
        }
    }

    protected void getSingleInputNodeSchema(SingleInputPlanNode node, SopremoRecordSchema schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException {
        Ordering groupOrder;
        SingleInputOperator contract = node.getSingleInputNode().getPactContract();
        int[] localPositions = contract.getKeyColumns(0);
        for (int i = 0; i < localPositions.length; ++i) {
            schema.add(localPositions[i]);
        }
        if (contract instanceof SopremoReduceOperator && (groupOrder = ((SopremoReduceOperator)contract).getInnerGroupOrder()) != null) {
            this.addOrderingToSchema(groupOrder, schema);
        }
    }

    protected void getSinkSchema(SinkPlanNode sinkPlanNode, SopremoRecordSchema schema) throws CompilerPostPassException {
        GenericDataSink sink = sinkPlanNode.getSinkNode().getPactContract();
        Ordering partitioning = sink.getPartitionOrdering();
        Ordering sorting = sink.getLocalOrder();
        if (partitioning != null) {
            this.addOrderingToSchema(partitioning, schema);
        }
        if (sorting != null) {
            this.addOrderingToSchema(sorting, schema);
        }
    }

    protected void traverse(PlanNode node, SopremoRecordSchema parentSchema, boolean createUtilities) {
        if (node instanceof SinkPlanNode) {
            this.setOrdering(((SingleInputPlanNode)node).getInput(), ((GenericDataSink)node.getPactContract()).getLocalOrder());
        } else if (node.getPactContract() instanceof SopremoReduceOperator) {
            this.setOrdering(((SingleInputPlanNode)node).getInput(), ((SopremoReduceOperator)node.getPactContract()).getInnerGroupOrder());
        } else if (node.getPactContract() instanceof SopremoCoGroupOperator) {
            this.setOrdering(((DualInputPlanNode)node).getInput1(), ((SopremoCoGroupOperator)node.getPactContract()).getFirstInnerGroupOrdering());
            this.setOrdering(((DualInputPlanNode)node).getInput2(), ((SopremoCoGroupOperator)node.getPactContract()).getSecondInnerGroupOrdering());
        }
        super.traverse(node, (AbstractSchema)parentSchema, createUtilities);
    }

    private void addOrderingToSchema(Ordering o, SopremoRecordSchema schema) {
        for (int i = 0; i < o.getNumberOfFields(); ++i) {
            schema.add(o.getFieldNumber(i));
        }
    }

    private void setOrdering(Channel input, Ordering localOrder) {
        if (localOrder != null) {
            input.getLocalProperties().setOrdering(localOrder);
            input.setLocalStrategy(input.getLocalStrategy(), new FieldList(localOrder.getFieldPositions()), localOrder.getFieldSortDirections());
        }
    }
}

