diff --git a/pom.xml b/pom.xml index b03573e..619ab99 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ net.sansa-stack sansa-parent - 0.6.0 + 0.7.1 sansa-inference-parent_2.11 diff --git a/sansa-inference-common/pom.xml b/sansa-inference-common/pom.xml index 48be0a4..15329d7 100644 --- a/sansa-inference-common/pom.xml +++ b/sansa-inference-common/pom.xml @@ -4,7 +4,7 @@ sansa-inference-parent_2.11 net.sansa-stack - 0.6.0 + 0.7.1 ../pom.xml sansa-inference-common_${scala.binary.version} @@ -79,11 +79,6 @@ org.apache.calcite calcite-core - - com.google.protobuf - protobuf-java - 3.5.0 - diff --git a/sansa-inference-flink/pom.xml b/sansa-inference-flink/pom.xml index a6d3b45..ee978ef 100644 --- a/sansa-inference-flink/pom.xml +++ b/sansa-inference-flink/pom.xml @@ -23,7 +23,7 @@ under the License. net.sansa-stack sansa-inference-parent_2.11 - 0.6.0 + 0.7.1 ../pom.xml sansa-inference-flink_${scala.binary.version} diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasonerRDFS.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasonerRDFS.scala index aca22a1..0a0e08f 100644 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasonerRDFS.scala +++ b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/ForwardRuleReasonerRDFS.scala @@ -318,7 +318,7 @@ class ForwardRuleReasonerRDFS(env: ExecutionEnvironment) extends ForwardRuleReas val additionalTripleRDDs = mutable.Seq(rdfs4, rdfs6, rdfs8_10, rdfs12) - allTriples = env.union(Seq(allTriples) ++ additionalTripleRDDs).distinct() + allTriples = env.union(Seq(allTriples) ++ additionalTripleRDDs).distinct(_.hashCode) } logger.info( diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/TransitiveReasoner.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/TransitiveReasoner.scala index 9ce67ec..b4d664d 100644 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/TransitiveReasoner.scala +++ b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/forwardchaining/TransitiveReasoner.scala @@ -7,10 +7,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{DataSet, _} import org.apache.flink.util.Collector import org.apache.jena.graph.{Node, Triple} -import org.apache.jena.sparql.util.NodeComparator -import net.sansa_stack.inference.flink.utils.NodeKey import net.sansa_stack.inference.utils.Profiler +import net.sansa_stack.rdf.flink.utils.NodeKey /** * An engine to compute the transitive closure (TC) for a set of triples given in several datastructures. diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/NodeKey.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/NodeKey.scala deleted file mode 100644 index 3aadb99..0000000 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/NodeKey.scala +++ /dev/null @@ -1,35 +0,0 @@ -package net.sansa_stack.inference.flink.utils - -import org.apache.jena.graph.Node -import org.apache.jena.sparql.util.NodeComparator - -/** - * Key type wrapper for Jena `Node` objects. - * It basically makes Node comparable which is necessary to be handles as key in Flink. - * - * @author Lorenz Buehmann - */ -class NodeKey(val node: Node) extends Comparable[NodeKey] with Equals { - - override def compareTo(o: NodeKey): Int = { - val other = o.node - if (node == null) - if (other == null) 0 else -1 - else - if (other == null) 1 else new NodeComparator().compare(node, other) - } - - override def canEqual(that: Any): Boolean = that.isInstanceOf[NodeKey] - - override def hashCode(): Int = 31 * node.## - - override def equals(that: Any): Boolean = - that match { - case key: NodeKey => (this eq key) || (key.canEqual(this) && hashCode == key.hashCode) - case _ => false - } -} - -object NodeKey { - def apply(node: Node): NodeKey = new NodeKey(node) -} diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key.scala deleted file mode 100644 index aeada12..0000000 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key.scala +++ /dev/null @@ -1,18 +0,0 @@ -package net.sansa_stack.inference.flink.utils.key - -/** - * Base of a tuple-like generic key. - * - * @tparam T The type of the concrete key type. - */ -abstract class Key[T <: Key[T]] extends Comparable[T] { - /** - * Gets the i-th element of the tuple-like key. - * - * @param pos The position. - * @return The element at that key position; - */ - def get(pos: Int): Any -} - - diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key1.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key1.scala deleted file mode 100644 index fcae711..0000000 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key1.scala +++ /dev/null @@ -1,36 +0,0 @@ -package net.sansa_stack.inference.flink.utils.key - -/** - * A key with one key field. - * - * @tparam T1 The type of the field. - */ -class Key1[T1 <: Comparable[T1]](val value1: T1) extends Key[Key1[T1]] with Equals { - - def get(pos: Int): Any = pos match { - case 0 => - value1 - case _ => - throw new IndexOutOfBoundsException - } - - override def hashCode: Int = if (value1 == null) 0 else value1.hashCode - - override def canEqual(that: Any): Boolean = that.isInstanceOf[Key1[T1]] - - override def equals(obj: Any): Boolean = - obj match { - case that: Key1[T1] => (this eq that) || (this.canEqual(that) && (value1 == that.value1)) - case _ => false - } - - override def toString: String = s"Key1 ($value1)" - - def compareTo(o: Key1[T1]): Int = { - val other = o.value1 - if (value1 == null) - if (other == null) 0 else -1 - else - if (other == null) 1 else value1.compareTo(other) - } -} diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key2.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key2.scala deleted file mode 100644 index b8d808e..0000000 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key2.scala +++ /dev/null @@ -1,53 +0,0 @@ -package net.sansa_stack.inference.flink.utils.key - -/** - * A key with two key fields. - * - * @tparam T1 The type of the first field. - * @tparam T2 The type of the second field. - */ -class Key2[T1 <: Comparable[T1], T2 <: Comparable[T2]](val value1: T1, val value2: T2) - extends Key[Key2[T1, T2]] - with Equals { - - def get(pos: Int): Any = pos match { - case 0 => - value1 - case 1 => - value2 - case _ => - throw new IndexOutOfBoundsException - } - - override def hashCode: Int = { - val c1: Int = if (value1 == null) 0 else value1.hashCode - val c2: Int = if (value2 == null) 0 else value2.hashCode - c1 * 17 + c2 * 31 - } - - override def canEqual(that: Any): Boolean = that.isInstanceOf[Key2[T1, T2]] - - override def equals(obj: Any): Boolean = - obj match { - case that: Key2[T1, T2] => (this eq that) || (this.canEqual(that) && (value1 == that.value1) && (value2 == that.value2)) - case _ => false - } - - override def toString: String = s"Key2 ($value1, $value2)" - - def compareTo(o: Key2[T1, T2]): Int = { - val other1 = o.value1 - val other2 = o.value2 - - val c1 = if (value1 == null) - if (other1 == null) 0 else -1 - else - if (other1 == null) 1 else value1.compareTo(other1) - - if(c1 != 0) c1 else - if (value2 == null) - if (other2 == null) 0 else -1 - else - if (other2 == null) 1 else value2.compareTo(other2) - } -} diff --git a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key3.scala b/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key3.scala deleted file mode 100644 index e875b26..0000000 --- a/sansa-inference-flink/src/main/scala/net/sansa_stack/inference/flink/utils/key/Key3.scala +++ /dev/null @@ -1,69 +0,0 @@ -package net.sansa_stack.inference.flink.utils.key - -/** - * A key with two key fields. - * - * @tparam T1 The type of the first field. - * @tparam T2 The type of the second field. - * @tparam T3 The type of the third field. - */ -class Key3[T1 <: Comparable[T1], T2 <: Comparable[T2], T3 <: Comparable[T3]](val value1: T1, val value2: T2, val value3: T3) - extends Key[Key3[T1, T2, T3]] - with Equals { - - def get(pos: Int): Any = pos match { - case 0 => - value1 - case 1 => - value2 - case 2 => - value3 - case _ => - throw new IndexOutOfBoundsException - } - - override def hashCode: Int = { - val c1: Int = if (value1 == null) 0 else value1.hashCode - val c2: Int = if (value2 == null) 0 else value2.hashCode - val c3: Int = if (value3 == null) 0 else value3.hashCode - c1 * 17 + c2 * 31 + c3 * 47 - } - - override def canEqual(that: Any): Boolean = that.isInstanceOf[Key3[T1, T2, T3]] - - override def equals(obj: Any): Boolean = - obj match { - case that: Key3[T1, T2, T3] => - (this eq that) || (this.canEqual(that) && (value1 == that.value1) && (value2 == that.value2) && (value3 == that.value3)) - case _ => false - } - - override def toString: String = s"Key3 ($value1, $value2, $value3)" - - def compareTo(o: Key3[T1, T2, T3]): Int = { - val other1 = o.value1 - val other2 = o.value2 - val other3 = o.value3 - - val c1 = if (value1 == null) - if (other1 == null) 0 else -1 - else - if (other1 == null) 1 else value1.compareTo(other1) - - if(c1 != 0) c1 else { - val c2 = if (value2 == null) - if (other2 == null) 0 else -1 - else - if (other2 == null) 1 else value2.compareTo(other2) - - if(c2 != 0) c2 else { - if (value3 == null) - if (other3 == null) 0 else -1 - else - if (other3 == null) 1 else value3.compareTo(other3) - } - - } - - } -} diff --git a/sansa-inference-spark/pom.xml b/sansa-inference-spark/pom.xml index b5501f0..ec593aa 100644 --- a/sansa-inference-spark/pom.xml +++ b/sansa-inference-spark/pom.xml @@ -4,7 +4,7 @@ net.sansa-stack sansa-inference-parent_2.11 - 0.6.0 + 0.7.1 ../pom.xml sansa-inference-spark_${scala.binary.version} @@ -136,6 +136,7 @@ org.apache.hadoop hadoop-common + ${hadoop.version} @@ -170,8 +171,6 @@ com.holdenkarau spark-testing-base_${scala.binary.version} - - 2.3.0_0.9.0 test @@ -323,7 +322,6 @@ org.apache.maven.plugins maven-dependency-plugin - resource-dependencies diff --git a/sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/loader/RDFGraphLoader.scala b/sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/loader/RDFGraphLoader.scala index 6e85e96..dd144af 100644 --- a/sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/loader/RDFGraphLoader.scala +++ b/sansa-inference-spark/src/main/scala/net/sansa_stack/inference/spark/data/loader/RDFGraphLoader.scala @@ -4,10 +4,9 @@ import java.net.URI import net.sansa_stack.inference.data.{SQLSchema, SQLSchemaDefault} import net.sansa_stack.inference.spark.data.model.{RDFGraph, RDFGraphDataFrame, RDFGraphDataset, RDFGraphNative} -import net.sansa_stack.inference.utils.NTriplesStringToJenaTriple import org.apache.jena.graph.Triple import org.apache.jena.riot.Lang -import org.apache.spark.sql.{Dataset, Encoder, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, Encoder, SparkSession} import org.apache.spark.{SparkConf, SparkContext} import org.slf4j.LoggerFactory import scala.language.implicitConversions @@ -222,7 +221,7 @@ object RDFGraphLoader { println(triples.count()) triples .filter("p == 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type'") - .write.mode(SaveMode.Append).rdf("/tmp/lubm/out") + .write.mode(org.apache.spark.sql.SaveMode.Append).rdf("/tmp/lubm/out") diff --git a/sansa-inference-tests/pom.xml b/sansa-inference-tests/pom.xml index 7868b86..35dfb9b 100644 --- a/sansa-inference-tests/pom.xml +++ b/sansa-inference-tests/pom.xml @@ -4,7 +4,7 @@ sansa-inference-parent_2.11 net.sansa-stack - 0.6.0 + 0.7.1 ../pom.xml sansa-inference-tests_${scala.binary.version}