/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.arraymodel.optimizer;

import eu.stratosphere.api.common.functions.Function;
import eu.stratosphere.api.common.io.OutputFormat;
import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.GenericDataSink;
import eu.stratosphere.api.common.operators.Ordering;
import eu.stratosphere.api.common.operators.SingleInputOperator;
import eu.stratosphere.api.common.operators.util.FieldList;
import eu.stratosphere.arraymodel.functions.AbstractArrayModelFunction;
import eu.stratosphere.arraymodel.io.ArrayModelOutputFormat;
import eu.stratosphere.compiler.CompilerException;
import eu.stratosphere.compiler.CompilerPostPassException;
import eu.stratosphere.compiler.plan.DualInputPlanNode;
import eu.stratosphere.compiler.plan.SingleInputPlanNode;
import eu.stratosphere.compiler.plan.SinkPlanNode;
import eu.stratosphere.compiler.postpass.AbstractSchema;
import eu.stratosphere.compiler.postpass.ConflictingFieldTypeInfoException;
import eu.stratosphere.compiler.postpass.DenseValueSchema;
import eu.stratosphere.compiler.postpass.GenericFlatTypePostPass;
import eu.stratosphere.compiler.postpass.MissingFieldTypeInfoException;
import eu.stratosphere.compiler.postpass.PostPassUtils;
import eu.stratosphere.pact.runtime.plugable.arrayrecord.ArrayRecordComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.arrayrecord.ArrayRecordPairComparatorFactory;
import eu.stratosphere.pact.runtime.plugable.arrayrecord.ArrayRecordSerializerFactory;
import eu.stratosphere.types.Value;

public class ArrayRecordOptimizerPostPass
extends GenericFlatTypePostPass<Class<? extends Value>, DenseValueSchema> {
    protected DenseValueSchema createEmptySchema() {
        return new DenseValueSchema();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    protected void getSinkSchema(SinkPlanNode sinkPlanNode, DenseValueSchema schema) throws CompilerPostPassException {
        GenericDataSink sink = sinkPlanNode.getSinkNode().getPactContract();
        OutputFormat format = (OutputFormat)sink.getFormatWrapper().getUserCodeObject();
        if (!ArrayModelOutputFormat.class.isAssignableFrom(format.getClass())) throw new CompilerException("Incompatibe input format type. Array model programs require an " + ArrayModelOutputFormat.class.getName());
        ArrayModelOutputFormat formatInstance = (ArrayModelOutputFormat)format;
        Class<? extends Value>[] types = formatInstance.getDataTypes();
        try {
            this.addToSchema(types, schema);
        }
        catch (ConflictingFieldTypeInfoException ex) {
            throw new RuntimeException("Bug! Conflict on first set of type entries in the data sink.");
        }
        Ordering partitioning = sink.getPartitionOrdering();
        Ordering sorting = sink.getLocalOrder();
        try {
            if (partitioning != null) {
                this.addOrderingToSchema(partitioning, schema);
            }
            if (sorting != null) {
                this.addOrderingToSchema(sorting, schema);
            }
        }
        catch (ConflictingFieldTypeInfoException ex) {
            throw new CompilerPostPassException("Conflicting information found when adding data sink types.");
        }
        schema.setNumFields(types.length);
    }

    protected void getSingleInputNodeSchema(SingleInputPlanNode node, DenseValueSchema schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException {
        Class<? extends Value>[] types;
        SingleInputOperator contract = node.getSingleInputNode().getPactContract();
        Function stub = (Function)contract.getUserCodeWrapper().getUserCodeObject();
        if (AbstractArrayModelFunction.class.isAssignableFrom(stub.getClass())) {
            AbstractArrayModelFunction ams = (AbstractArrayModelFunction)stub;
            types = ams.getDataTypes(0);
            if (types == null) {
                throw new CompilerPostPassException("Missing type annotation in UDF for '" + contract.getName() + "'.");
            }
        } else {
            throw new CompilerException("Incompatibe stub type. Array data model programs require array data model stubs.");
        }
        this.addToSchema(types, schema);
        schema.setNumFields(types.length);
    }

    protected void getDualInputNodeSchema(DualInputPlanNode node, DenseValueSchema input1Schema, DenseValueSchema input2Schema) throws CompilerPostPassException, ConflictingFieldTypeInfoException {
        Class<? extends Value>[] types2;
        Class<? extends Value>[] types1;
        DualInputOperator contract = node.getTwoInputNode().getPactContract();
        Function stub = (Function)contract.getUserCodeWrapper().getUserCodeObject();
        if (AbstractArrayModelFunction.class.isAssignableFrom(stub.getClass())) {
            AbstractArrayModelFunction ams = (AbstractArrayModelFunction)stub;
            types1 = ams.getDataTypes(0);
            types2 = ams.getDataTypes(1);
            if (types1 == null) {
                throw new CompilerPostPassException("Missing type annotation for first parameter type in UDF for '" + contract.getName() + "'.");
            }
            if (types2 == null) {
                throw new CompilerPostPassException("Missing type annotation for second parameter type in UDF for '" + contract.getName() + "'.");
            }
        } else {
            throw new CompilerException("Incompatibe stub type. Array data model programs require array data model stubs.");
        }
        this.addToSchema(types1, input1Schema);
        this.addToSchema(types2, input2Schema);
        input1Schema.setNumFields(types1.length);
        input2Schema.setNumFields(types2.length);
    }

    protected ArrayRecordSerializerFactory createSerializer(DenseValueSchema schema) throws MissingFieldTypeInfoException {
        int numFields = schema.getNumFields();
        if (numFields <= 0) {
            throw new IllegalArgumentException("Bug: Attempt to create serializer for " + numFields + " fields.");
        }
        Class[] types = new Class[numFields];
        for (int i = 0; i < numFields; ++i) {
            Class type = schema.getType(i);
            if (type == null) {
                throw new MissingFieldTypeInfoException(i);
            }
            types[i] = type;
        }
        return new ArrayRecordSerializerFactory(types);
    }

    protected ArrayRecordComparatorFactory createComparator(FieldList fields, boolean[] directions, DenseValueSchema schema) throws MissingFieldTypeInfoException {
        int[] positions = fields.toArray();
        Class[] keyTypes = PostPassUtils.getKeys((AbstractSchema)schema, (int[])positions);
        return new ArrayRecordComparatorFactory(positions, keyTypes, directions);
    }

    protected ArrayRecordPairComparatorFactory createPairComparator(FieldList fields1, FieldList fields2, boolean[] sortDirections, DenseValueSchema schema1, DenseValueSchema schema2) {
        return ArrayRecordPairComparatorFactory.get();
    }

    private void addOrderingToSchema(Ordering o, DenseValueSchema schema) throws ConflictingFieldTypeInfoException {
        for (int i = 0; i < o.getNumberOfFields(); ++i) {
            Integer pos = o.getFieldNumber(i);
            Class type = o.getType(i);
            schema.addType(pos.intValue(), type);
        }
    }

    private void addToSchema(Class<? extends Value>[] types, DenseValueSchema schema) throws ConflictingFieldTypeInfoException {
        for (int i = 0; i < types.length; ++i) {
            schema.addType(i, types[i]);
        }
    }
}

