/*
 * Decompiled with CFR 0.152.
 */
package eu.stratosphere.spargel.java;

import eu.stratosphere.api.common.aggregators.Aggregator;
import eu.stratosphere.api.common.operators.BinaryOperatorInformation;
import eu.stratosphere.api.common.operators.DualInputOperator;
import eu.stratosphere.api.common.operators.DualInputSemanticProperties;
import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.api.common.operators.base.CoGroupOperatorBase;
import eu.stratosphere.api.common.operators.base.DeltaIterationBase;
import eu.stratosphere.api.java.DataSet;
import eu.stratosphere.api.java.functions.CoGroupFunction;
import eu.stratosphere.api.java.operators.CustomUnaryOperation;
import eu.stratosphere.api.java.operators.TwoInputOperator;
import eu.stratosphere.api.java.tuple.Tuple;
import eu.stratosphere.api.java.tuple.Tuple2;
import eu.stratosphere.api.java.tuple.Tuple3;
import eu.stratosphere.api.java.typeutils.TupleTypeInfo;
import eu.stratosphere.api.java.typeutils.TypeExtractor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.spargel.java.MessageIterator;
import eu.stratosphere.spargel.java.MessagingFunction;
import eu.stratosphere.spargel.java.VertexUpdateFunction;
import eu.stratosphere.types.TypeInformation;
import eu.stratosphere.util.Collector;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.lang3.Validate;

public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
implements CustomUnaryOperation<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> {
    private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
    private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
    private final DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue;
    private final DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue;
    private final TypeInformation<Message> messageType;
    private final Map<String, Class<? extends Aggregator<?>>> aggregators;
    private final int maximumNumberOfIterations;
    private DataSet<Tuple2<VertexKey, VertexValue>> initialVertices;

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue, int maximumNumberOfIterations) {
        TypeInformation edgesType = edgesWithoutValue.getType();
        Validate.isTrue((edgesType.isTupleType() && edgesType.getArity() == 2 ? 1 : 0) != 0, (String)"The edges data set (for edges without edge values) must consist of 2-tuples.", (Object[])new Object[0]);
        TupleTypeInfo tupleInfo = (TupleTypeInfo)edgesType;
        Validate.isTrue((tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()) ? 1 : 0) != 0, (String)"Both tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", (Object[])new Object[0]);
        this.updateFunction = uf;
        this.messagingFunction = mf;
        this.edgesWithoutValue = edgesWithoutValue;
        this.edgesWithValue = null;
        this.maximumNumberOfIterations = maximumNumberOfIterations;
        this.aggregators = new HashMap();
        this.messageType = this.getMessageType(mf);
    }

    private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, int maximumNumberOfIterations, boolean edgeHasValueMarker) {
        TypeInformation edgesType = edgesWithValue.getType();
        Validate.isTrue((edgesType.isTupleType() && edgesType.getArity() == 3 ? 1 : 0) != 0, (String)"The edges data set (for edges with edge values) must consist of 3-tuples.", (Object[])new Object[0]);
        TupleTypeInfo tupleInfo = (TupleTypeInfo)edgesType;
        Validate.isTrue((tupleInfo.getTypeAt(0).equals(tupleInfo.getTypeAt(1)) && Comparable.class.isAssignableFrom(tupleInfo.getTypeAt(0).getTypeClass()) ? 1 : 0) != 0, (String)"The first two tuple fields (source and target vertex id) must be of the data type that represents the vertex key and implement the java.lang.Comparable interface.", (Object[])new Object[0]);
        this.updateFunction = uf;
        this.messagingFunction = mf;
        this.edgesWithoutValue = null;
        this.edgesWithValue = edgesWithValue;
        this.maximumNumberOfIterations = maximumNumberOfIterations;
        this.aggregators = new HashMap();
        this.messageType = this.getMessageType(mf);
    }

    private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
        return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), (int)2, null, null);
    }

    public void registerAggregator(String name, Class<? extends Aggregator<?>> aggregator) {
        this.aggregators.put(name, aggregator);
    }

    public void setInput(DataSet<Tuple2<VertexKey, VertexValue>> inputData) {
        TypeInformation inputType = inputData.getType();
        Validate.isTrue((inputType.isTupleType() && inputType.getArity() == 2 ? 1 : 0) != 0, (String)"The input data set (the initial vertices) must consist of 2-tuples.", (Object[])new Object[0]);
        TypeInformation keyType = ((TupleTypeInfo)inputType).getTypeAt(0);
        TypeInformation edgeType = this.edgesWithoutValue != null ? this.edgesWithoutValue.getType() : this.edgesWithValue.getType();
        TypeInformation edgeKeyType = ((TupleTypeInfo)edgeType).getTypeAt(0);
        Validate.isTrue((boolean)keyType.equals(edgeKeyType), (String)"The first tuple field (the vertex id) of the input data set (the initial vertices) must be the same data type as the first fields of the edge data set (the source vertex id). Here, the key type for the vertex ids is '%s' and the key type  for the edges is '%s'.", (Object[])new Object[]{keyType, edgeKeyType});
        this.initialVertices = inputData;
    }

    public GraphIterationOperator<VertexKey, VertexValue, Message, ?> createOperator() {
        VertexUpdateUdf updateUdf = new VertexUpdateUdf(this.updateFunction);
        if (this.edgesWithoutValue != null) {
            MessagingUdfNoEdgeValues messenger = new MessagingUdfNoEdgeValues(this.messagingFunction);
            return new GraphIterationOperator(this.initialVertices, this.edgesWithoutValue, updateUdf, messenger, this.messageType, this.aggregators, this.maximumNumberOfIterations);
        }
        MessagingUdfWithEdgeValues messenger = new MessagingUdfWithEdgeValues(this.messagingFunction);
        return new GraphIterationOperator(this.initialVertices, this.edgesWithValue, updateUdf, messenger, this.messageType, this.aggregators, this.maximumNumberOfIterations);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message> VertexCentricIteration<VertexKey, VertexValue, Message, ?> withPlainEdges(DataSet<Tuple2<VertexKey, VertexKey>> edgesWithoutValue, VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction, int maximumNumberOfIterations) {
        MessagingFunction<VertexKey, VertexValue, Message, ?> tmf = messagingFunction;
        return new VertexCentricIteration(vertexUpdateFunction, tmf, edgesWithoutValue, maximumNumberOfIterations);
    }

    public static final <VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue> VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withValuedEdges(DataSet<Tuple3<VertexKey, VertexKey, EdgeValue>> edgesWithValue, VertexUpdateFunction<VertexKey, VertexValue, Message> uf, MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf, int maximumNumberOfIterations) {
        return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
    }

    private static final class GraphIterationOperator<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeType extends Tuple>
    extends TwoInputOperator<Tuple2<VertexKey, VertexValue>, EdgeType, Tuple2<VertexKey, VertexValue>, GraphIterationOperator<VertexKey, VertexValue, Message, EdgeType>> {
        private final DataSet<EdgeType> edges;
        private final CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> updateFunction;
        private final CoGroupFunction<EdgeType, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> messagingFunction;
        private final TypeInformation<Tuple2<VertexKey, Message>> messageType;
        private final Map<String, Class<? extends Aggregator<?>>> aggregators;
        private final int maximumNumberOfIterations;

        private GraphIterationOperator(DataSet<Tuple2<VertexKey, VertexValue>> initialVertices, DataSet<EdgeType> edges, VertexUpdateUdf<VertexKey, VertexValue, Message> updateFunction, CoGroupFunction<EdgeType, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> messagingFunction, TypeInformation<Message> messageType, Map<String, Class<? extends Aggregator<?>>> aggregators, int maximumNumberOfIterations) {
            super(initialVertices, edges, initialVertices.getType());
            this.edges = edges;
            this.updateFunction = updateFunction;
            this.messagingFunction = messagingFunction;
            this.aggregators = aggregators;
            this.maximumNumberOfIterations = maximumNumberOfIterations;
            TypeInformation keyType = ((TupleTypeInfo)initialVertices.getType()).getTypeAt(0);
            this.messageType = new TupleTypeInfo(new TypeInformation[]{keyType, messageType});
        }

        protected DualInputOperator<Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>, ?> translateToDataFlow(Operator<Tuple2<VertexKey, VertexValue>> input1, Operator<EdgeType> input2) {
            String name = this.getName() != null ? this.getName() : "Vertex-centric iteration (" + this.updateFunction + " | " + this.messagingFunction + ")";
            int[] zeroKeyPos = new int[]{0};
            DeltaIterationBase iteration = new DeltaIterationBase(new BinaryOperatorInformation(this.getInput1Type(), this.getInput1Type(), this.getInput1Type()), zeroKeyPos, name);
            iteration.setMaximumNumberOfIterations(this.maximumNumberOfIterations);
            for (Map.Entry<String, Class<Aggregator<?>>> entry : this.aggregators.entrySet()) {
                iteration.getAggregators().registerAggregator(entry.getKey(), entry.getValue());
            }
            CoGroupOperatorBase messenger = new CoGroupOperatorBase(this.messagingFunction, new BinaryOperatorInformation(this.edges.getType(), this.getInput1Type(), this.messageType), zeroKeyPos, zeroKeyPos, "Messaging");
            messenger.setSecondInput(iteration.getWorkset());
            CoGroupOperatorBase updater = new CoGroupOperatorBase(this.updateFunction, new BinaryOperatorInformation(this.messageType, this.getInput1Type(), this.getInput1Type()), zeroKeyPos, zeroKeyPos, "Vertex State Updates");
            updater.setFirstInput((Operator)messenger);
            updater.setSecondInput(iteration.getSolutionSet());
            DualInputSemanticProperties semanticProps = new DualInputSemanticProperties();
            semanticProps.addForwardedField1(0, 0);
            semanticProps.addForwardedField2(0, 0);
            updater.setSemanticProperties(semanticProps);
            iteration.setSolutionSetDelta((Operator)updater);
            iteration.setNextWorkset((Operator)updater);
            iteration.setFirstInput(input1);
            iteration.setSecondInput(input1);
            messenger.setFirstInput(input2);
            return iteration;
        }
    }

    private static final class MessagingUdfWithEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message, EdgeValue>
    extends CoGroupFunction<Tuple3<VertexKey, VertexKey, EdgeValue>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1L;
        private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;

        private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction) {
            this.messagingFunction = messagingFunction;
        }

        public void coGroup(Iterator<Tuple3<VertexKey, VertexKey, EdgeValue>> edges, Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) throws Exception {
            if (state.hasNext()) {
                Tuple2<VertexKey, VertexValue> newVertexState = state.next();
                this.messagingFunction.set(edges, out);
                this.messagingFunction.sendMessages((Comparable)newVertexState.f0, newVertexState.f1);
            }
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(this.getIterationRuntimeContext(), true);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }
    }

    private static final class MessagingUdfNoEdgeValues<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
    extends CoGroupFunction<Tuple2<VertexKey, VertexKey>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, Message>> {
        private static final long serialVersionUID = 1L;
        private final MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction;

        private MessagingUdfNoEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, ?> messagingFunction) {
            this.messagingFunction = messagingFunction;
        }

        public void coGroup(Iterator<Tuple2<VertexKey, VertexKey>> edges, Iterator<Tuple2<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out) throws Exception {
            if (state.hasNext()) {
                Tuple2<VertexKey, VertexValue> newVertexState = state.next();
                this.messagingFunction.set(edges, out);
                this.messagingFunction.sendMessages((Comparable)newVertexState.f0, newVertexState.f1);
            }
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.messagingFunction.init(this.getIterationRuntimeContext(), false);
            }
            this.messagingFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.messagingFunction.postSuperstep();
        }
    }

    private static final class VertexUpdateUdf<VertexKey extends Comparable<VertexKey>, VertexValue, Message>
    extends CoGroupFunction<Tuple2<VertexKey, Message>, Tuple2<VertexKey, VertexValue>, Tuple2<VertexKey, VertexValue>> {
        private static final long serialVersionUID = 1L;
        private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
        private final MessageIterator<Message> messageIter = new MessageIterator();

        private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction) {
            this.vertexUpdateFunction = vertexUpdateFunction;
        }

        public void coGroup(Iterator<Tuple2<VertexKey, Message>> messages, Iterator<Tuple2<VertexKey, VertexValue>> vertex, Collector<Tuple2<VertexKey, VertexValue>> out) throws Exception {
            if (!vertex.hasNext()) {
                if (messages.hasNext()) {
                    String message = "Target vertex does not exist!.";
                    try {
                        Tuple2<VertexKey, Message> next = messages.next();
                        message = "Target vertex '" + next.f0 + "' does not exist!.";
                    }
                    catch (Throwable t) {
                        // empty catch block
                    }
                    throw new Exception(message);
                }
                throw new Exception();
            }
            Tuple2<VertexKey, VertexValue> vertexState = vertex.next();
            Iterator downcastIter = messages;
            this.messageIter.setSource(downcastIter);
            this.vertexUpdateFunction.setOutput(vertexState, out);
            this.vertexUpdateFunction.updateVertex((Comparable)vertexState.f0, vertexState.f1, this.messageIter);
        }

        public void open(Configuration parameters) throws Exception {
            if (this.getIterationRuntimeContext().getSuperstepNumber() == 1) {
                this.vertexUpdateFunction.init(this.getIterationRuntimeContext());
            }
            this.vertexUpdateFunction.preSuperstep();
        }

        public void close() throws Exception {
            this.vertexUpdateFunction.postSuperstep();
        }
    }
}

