Skip to content

Commit

Permalink
apacheGH-2930: Support for query cancelation for spatial property fun…
Browse files Browse the repository at this point in the history
…ctions by creating iterators lazily.
  • Loading branch information
Lorenz Buehmann authored and Aklakan committed Feb 9, 2025
1 parent ac26a8a commit 00e19cd
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset
// See also query substitution handled in QueryExecBuilder
this.initialBinding = initialToEngine;

// Cancel signal may originate from an e.c. an update execution.
// Cancel signal may originate from e.g. an update execution.
this.cancelSignal = Context.getOrSetCancelSignal(context);

init();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ Canonical implementation of toSet().
public static <T> Set<T> asSet( ExtendedIterator<T> it )
{
Set<T> result = new HashSet<>();
it.forEachRemaining(result::add);
try { it.forEachRemaining(result::add); }
finally { it.close(); }
return result;
}

Expand All @@ -238,7 +239,8 @@ that iterator. Canonical implementation of toList().
public static <T> List<T> asList( ExtendedIterator<T> it )
{
List<T> result = new ArrayList<>();
it.forEachRemaining(result::add);
try { it.forEachRemaining(result::add); }
finally { it.close(); }
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.jena.geosparql.geo.topological;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.geosparql.geof.topological.GenericFilterFunction;
import org.apache.jena.geosparql.implementation.GeometryWrapper;
import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex;
Expand All @@ -37,8 +38,10 @@
import org.apache.jena.sparql.engine.QueryIterator;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.engine.binding.BindingFactory;
import org.apache.jena.sparql.engine.iterator.QueryIter;
import org.apache.jena.sparql.engine.iterator.QueryIterConcat;
import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator;
import org.apache.jena.sparql.engine.iterator.QueryIterPlainWrapper;
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton;
import org.apache.jena.sparql.expr.ExprEvalException;
import org.apache.jena.sparql.pfunction.PFuncSimple;
Expand Down Expand Up @@ -83,14 +86,18 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate
//One bound and one unbound.
return oneBound(binding, subject, predicate, object, execCxt);
}
}

private QueryIterator bothBound(Binding binding, boolean isSubjectBound, Node subject, Node predicate, Node object, ExecutionContext execCxt) {
QueryIterator iter = isSubjectBound
? bothBound(binding, subject, predicate, object, execCxt)
: bothBound(binding, object, predicate, subject, execCxt);
return iter;
}

private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt);

Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex);
if (isPositiveResult) {
//Filter function test succeded so retain binding.
Expand All @@ -99,47 +106,33 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N
//Filter function test failed so null result.
return QueryIterNullIterator.create(execCxt);
}

}

private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
Var subjectVar = Var.alloc(subject.getName());

Graph graph = execCxt.getActiveGraph();

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> subjectTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
} else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) {
ExtendedIterator<Triple> featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE);
ExtendedIterator<Triple> geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE);
subjectTriples = featureTriples.andThen(geometryTriples);
} else {
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

//Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects.
while (subjectTriples.hasNext()) {
Triple subjectTriple = subjectTriples.next();
Node boundSubject = subjectTriple.getSubject();
Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject);
QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt);
queryIterConcat.add(queryIter);
}

return queryIterConcat;
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);
ExtendedIterator<Binding> iterator = spatialTriples
.mapWith(Triple::getSubject)
.mapWith(node -> BindingFactory.binding(binding, subjectVar, node));

QueryIter queryIter = QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt),
execCxt
);
return queryIter;
}

private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {

Graph graph = execCxt.getActiveGraph();
Node boundNode;
Node unboundNode;
Boolean isSubjectBound;
boolean isSubjectBound;
if (subject.isConcrete()) {
//Subject is bound, object is unbound.
boundNode = subject;
Expand All @@ -152,33 +145,51 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
isSubjectBound = false;
}

if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!(boundNode.isLiteral() ||
graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) ||
graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) {
//Bound node is not a Feature or a Geometry or has Geo predicates so exit.
return QueryIterNullIterator.create(execCxt);
}
}

boolean isSpatialIndex = SpatialIndex.isDefined(execCxt);
QueryIterConcat queryIterConcat;
QueryIterator result;
if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) {
//Disjointed so retrieve all cases.
queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
result = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
} else {
//Only retrieve those in the spatial index which are within same bounding box.
queryIterConcat = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
result = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
}

return queryIterConcat;
return result;
}

private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {
private QueryIterator findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {

//Prepare the results.
Var unboundVar = Var.alloc(unboundNode.getName());
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);

//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);

ExtendedIterator<Binding> iterator = spatialTriples
.mapWith(Triple::getSubject)
.mapWith(node -> BindingFactory.binding(binding, unboundVar, node));

return QueryIter.flatMap(
QueryIterPlainWrapper.create(iterator, execCxt),
b -> {
Node spatialNode = b.get(unboundVar);
QueryIterator iter = bothBound(b, isSubjectBound, boundNode, predicate, spatialNode, execCxt);
return iter;
},
execCxt);
}

private static ExtendedIterator<Triple> findSpatialTriples(Graph graph) {
ExtendedIterator<Triple> spatialTriples;
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
Expand All @@ -190,45 +201,33 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
}

while (spatialTriples.hasNext()) {
Triple spatialTriple = spatialTriples.next();
Node spatialNode = spatialTriple.getSubject();
Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt);
} else {
queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

return queryIterConcat;
return spatialTriples;
}

private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {

private QueryIterator findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {
try {
//Prepare for results.
Var unboundVar = Var.alloc(unboundNode.getName());
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
Var unboundVar = Var.alloc(unboundNode);

//Find the asserted triples.
List<Node> assertedNodes = !isSubjectBound || !boundNode.isLiteral() ? findAsserted(graph, boundNode, isSubjectBound, predicate) : Collections.emptyList();
for (Node node : assertedNodes) {
Binding newBind = BindingFactory.binding(binding, unboundVar, node);
QueryIterator queryIter = QueryIterSingleton.create(newBind, execCxt);
queryIterConcat.add(queryIter);
}
Collection<Node> assertedNodes = !isSubjectBound || !boundNode.isLiteral()
? findAsserted(graph, boundNode, isSubjectBound, predicate)
: List.of();

QueryIterator assertedNodesIter = QueryIterPlainWrapper.create(
Iter.map(assertedNodes.iterator(), node -> BindingFactory.binding(binding, unboundVar, node)),
execCxt);

//Find the GeometryLiteral of the Bound Node.
SpatialObjectGeometryLiteral boundGeometryLiteral = SpatialObjectGeometryLiteral.retrieve(graph, boundNode);
if (!boundGeometryLiteral.isValid()) {
//Bound Node is not a Feature or a Geometry or there is no GeometryLiteral so exit.
return queryIterConcat;
return assertedNodesIter;
}

QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
queryIterConcat.add(assertedNodesIter);

Node geometryLiteral = boundGeometryLiteral.getGeometryLiteral();

//Perform the search of the Spatial Index of the Dataset.
Expand All @@ -238,55 +237,66 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode,
Envelope searchEnvelope = transformedGeom.getEnvelope();
HashSet<Resource> features = spatialIndex.query(searchEnvelope);

//Check each of the Features that match the search.
for (Resource feature : features) {
Node featureNode = feature.asNode();

//Ensure not already an asserted node.
if (!assertedNodes.contains(featureNode)) {

Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt);
} else {
queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}

//Also test all Geometry of the Features. All, some or one Geometry may have matched.
List<Node> featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE);
for ( Node geomNode : featureGeometryTriples) {
//Ensure not already an asserted node.
if (!assertedNodes.contains(geomNode)) {
Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode);
QueryIterator queryIter;
if (isSubjectBound) {
queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt);
} else {
queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt);
}
queryIterConcat.add(queryIter);
}
}
}
// Check each of the Features that match the search.
QueryIterator featuresIter = QueryIterPlainWrapper.create(
Iter.map(features.iterator(), feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())),
execCxt);

QueryIterator queryIterator = QueryIter.flatMap(featuresIter,
featureBinding -> {
return findByFeature(graph, binding, featureBinding,
isSubjectBound, boundNode, predicate, unboundVar,
execCxt, assertedNodes);
},
execCxt);
queryIterConcat.add(queryIterator);

return queryIterConcat;
} catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) {
throw new ExprEvalException(ex.getMessage() + ": " + FmtUtils.stringForNode(boundNode) + ", " + FmtUtils.stringForNode(unboundNode) + ", " + FmtUtils.stringForNode(predicate), ex);
}
}

private List<Node> findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) {
List<Node> assertedNodes = new ArrayList<>();
if (isSubjectBound) {
List<Node> x = G.listSP(graph, boundNode, predicate);
assertedNodes.addAll(x);
} else {
List<Node> x = G.listPO(graph, predicate, boundNode);
assertedNodes.addAll(x);
private QueryIterator findByFeature(Graph graph, Binding binding, Binding featureBinding,
boolean isSubjectBound, Node boundNode, Node predicate, Var unboundVar,
ExecutionContext execCxt, Collection<Node> assertedNodes) {

Node featureNode = featureBinding.get(unboundVar);
QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt);

// Check Features directly if not already asserted
if (!assertedNodes.contains(featureNode)) {
QueryIterator tmpIter = bothBound(featureBinding, isSubjectBound, boundNode, predicate, featureNode, execCxt);
featureIterConcat.add(tmpIter);
}

// Also test all Geometry of the Features. All, some or one Geometry may have matched.
ExtendedIterator<Node> featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE);
QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create(
Iter.map(
Iter.filter( // omit asserted
featureGeometries,
geometry -> !assertedNodes.contains(geometry)
),
geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)),
execCxt);

geometriesQueryIterator = QueryIter.flatMap(
geometriesQueryIterator,
b2 -> {
Node geomNode = b2.get(unboundVar);
return bothBound(b2, isSubjectBound, boundNode, predicate, geomNode, execCxt);
},
execCxt);

featureIterConcat.add(geometriesQueryIterator);
return featureIterConcat;
}

private List<Node> findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) {
List<Node> assertedNodes = isSubjectBound
? G.listSP(graph, boundNode, predicate)
: G.listPO(graph, predicate, boundNode);
return assertedNodes;
}

Expand Down Expand Up @@ -323,5 +333,4 @@ protected final Boolean queryRewrite(Graph graph, Node subject, Node predicate,
public Boolean testFilterFunction(Node subjectGeometryLiteral, Node objectGeometryLiteral) {
return filterFunction.exec(subjectGeometryLiteral, objectGeometryLiteral);
}

}
Loading

0 comments on commit 00e19cd

Please sign in to comment.