/*
 * Decompiled with CFR 0.152.
 */
package net.sansa_stack.spark.rdd.op.rdf;

import java.io.Serializable;
import net.sansa_stack.spark.rdd.function.JavaRddFunction;
import org.aksw.commons.lambda.serializable.SerializableFunction;
import org.aksw.commons.util.string.StringUtils;
import org.aksw.jena_sparql_api.io.json.RDFNodeJsonUtils;
import org.apache.jena.graph.Graph;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.ModelFactory;
import org.apache.jena.riot.out.NodeFmtLib;
import org.apache.jena.sparql.core.Quad;
import org.apache.jena.sparql.graph.GraphFactory;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

public class JavaRddOfTriplesOps {
    public static <K> JavaPairRDD<K, Model> groupTriplesIntoModels(JavaPairRDD<K, Triple> rdd) {
        return rdd.combineByKey((Function & Serializable)triple -> ModelFactory.createDefaultModel(), (Function2 & Serializable)(model, triple) -> {
            model.getGraph().add(triple);
            return model;
        }, (Function2 & Serializable)(m1, m2) -> {
            m1.add(m2);
            return m1;
        });
    }

    public static <K> JavaPairRDD<K, Model> groupBy(JavaRDD<Triple> rdd, SerializableFunction<? super Triple, K> tripleToKey) {
        JavaPairRDD tmp = rdd.mapToPair((PairFunction & Serializable)t -> new Tuple2(tripleToKey.apply(t), t));
        return JavaRddOfTriplesOps.groupTriplesIntoModels(tmp);
    }

    public static JavaPairRDD<Node, Model> groupBySubjectNodes(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, Triple::getSubject);
    }

    public static JavaPairRDD<Node, Model> groupByObjectNodes(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, Triple::getObject);
    }

    public static JavaPairRDD<Node, Model> groupByPredicateNodes(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, Triple::getPredicate);
    }

    public static JavaPairRDD<String, Model> groupBySubjects(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, (SerializableFunction & Serializable)t -> JavaRddOfTriplesOps.toGraphName(t.getSubject()));
    }

    public static JavaPairRDD<String, Model> groupByObjects(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, (SerializableFunction & Serializable)t -> JavaRddOfTriplesOps.toGraphName(t.getObject()));
    }

    public static JavaPairRDD<String, Model> groupByPredicates(JavaRDD<Triple> rdd) {
        return JavaRddOfTriplesOps.groupBy(rdd, (SerializableFunction & Serializable)t -> JavaRddOfTriplesOps.toGraphName(t.getPredicate()));
    }

    public static String toGraphName(Node node) {
        if (node.isURI()) {
            return node.getURI();
        }
        if (node.isBlank()) {
            return "_:" + node.getBlankNodeLabel();
        }
        if (node.isNodeTriple()) {
            return "x-rdfstar:" + StringUtils.urlEncode((String)RDFNodeJsonUtils.nodeToStr((Node)node));
        }
        if (node.isLiteral()) {
            return "x-literal:" + StringUtils.urlEncode((String)RDFNodeJsonUtils.nodeToStr((Node)node));
        }
        throw new RuntimeException("Unknown node: " + node);
    }

    public static JavaRDD<Model> mapToModel(JavaRDD<Triple> rdd) {
        return rdd.map((Function & Serializable)triple -> {
            Graph g = GraphFactory.createDefaultGraph();
            g.add(triple);
            Model r = ModelFactory.createModelForGraph((Graph)g);
            return r;
        });
    }

    public static JavaRddFunction<Triple, Quad> mapIntoGraph(Node graphNode) {
        return (JavaRddFunction & Serializable)rdd -> rdd.map((Function & Serializable)triple -> new Quad(graphNode, triple));
    }

    public static JavaRDD<Triple> postProcess(JavaRDD<Triple> rdd, boolean sort, boolean ascending, boolean distinct, int numPartitions) {
        if (distinct) {
            rdd = rdd.distinct();
        }
        if (sort) {
            rdd = rdd.sortBy(NodeFmtLib::str, ascending, numPartitions);
        }
        return rdd;
    }
}

