Skip to content

Commit 7422ac2

Browse files
Lorenz Buehmannafs
Lorenz Buehmann
authored andcommitted
GH-2930: Support for query cancelation for spatial property functions by creating iterators lazily.
1 parent d90f86f commit 7422ac2

File tree

4 files changed

+286
-109
lines changed

4 files changed

+286
-109
lines changed

jena-arq/src/main/java/org/apache/jena/sparql/exec/QueryExecDataset.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ protected QueryExecDataset(Query query, String queryString, DatasetGraph dataset
111111
// See also query substitution handled in QueryExecBuilder
112112
this.initialBinding = initialToEngine;
113113

114-
// Cancel signal may originate from an e.c. an update execution.
114+
// Cancel signal may originate from e.g. an update execution.
115115
this.cancelSignal = Context.getOrSetCancelSignal(context);
116116

117117
init();

jena-core/src/main/java/org/apache/jena/util/iterator/NiceIterator.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ Canonical implementation of toSet().
227227
public static <T> Set<T> asSet( ExtendedIterator<T> it )
228228
{
229229
Set<T> result = new HashSet<>();
230-
it.forEachRemaining(result::add);
230+
try { it.forEachRemaining(result::add); }
231+
finally { it.close(); }
231232
return result;
232233
}
233234

@@ -238,7 +239,8 @@ that iterator. Canonical implementation of toList().
238239
public static <T> List<T> asList( ExtendedIterator<T> it )
239240
{
240241
List<T> result = new ArrayList<>();
241-
it.forEachRemaining(result::add);
242+
try { it.forEachRemaining(result::add); }
243+
finally { it.close(); }
242244
return result;
243245
}
244246
}

jena-geosparql/src/main/java/org/apache/jena/geosparql/geo/topological/GenericPropertyFunction.java

+115-106
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
*/
1818
package org.apache.jena.geosparql.geo.topological;
1919

20-
import java.util.ArrayList;
21-
import java.util.Collections;
20+
import java.util.Collection;
2221
import java.util.HashSet;
2322
import java.util.List;
23+
24+
import org.apache.jena.atlas.iterator.Iter;
2425
import org.apache.jena.geosparql.geof.topological.GenericFilterFunction;
2526
import org.apache.jena.geosparql.implementation.GeometryWrapper;
2627
import org.apache.jena.geosparql.implementation.index.QueryRewriteIndex;
@@ -37,8 +38,10 @@
3738
import org.apache.jena.sparql.engine.QueryIterator;
3839
import org.apache.jena.sparql.engine.binding.Binding;
3940
import org.apache.jena.sparql.engine.binding.BindingFactory;
41+
import org.apache.jena.sparql.engine.iterator.QueryIter;
4042
import org.apache.jena.sparql.engine.iterator.QueryIterConcat;
4143
import org.apache.jena.sparql.engine.iterator.QueryIterNullIterator;
44+
import org.apache.jena.sparql.engine.iterator.QueryIterPlainWrapper;
4245
import org.apache.jena.sparql.engine.iterator.QueryIterSingleton;
4346
import org.apache.jena.sparql.expr.ExprEvalException;
4447
import org.apache.jena.sparql.pfunction.PFuncSimple;
@@ -83,14 +86,18 @@ public QueryIterator execEvaluated(Binding binding, Node subject, Node predicate
8386
//One bound and one unbound.
8487
return oneBound(binding, subject, predicate, object, execCxt);
8588
}
89+
}
8690

91+
private QueryIterator bothBound(Binding binding, boolean isSubjectBound, Node subject, Node predicate, Node object, ExecutionContext execCxt) {
92+
QueryIterator iter = isSubjectBound
93+
? bothBound(binding, subject, predicate, object, execCxt)
94+
: bothBound(binding, object, predicate, subject, execCxt);
95+
return iter;
8796
}
8897

8998
private QueryIterator bothBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {
90-
9199
Graph graph = execCxt.getActiveGraph();
92100
QueryRewriteIndex queryRewriteIndex = QueryRewriteIndex.retrieve(execCxt);
93-
94101
Boolean isPositiveResult = queryRewrite(graph, subject, predicate, object, queryRewriteIndex);
95102
if (isPositiveResult) {
96103
//Filter function test succeded so retain binding.
@@ -99,47 +106,33 @@ private QueryIterator bothBound(Binding binding, Node subject, Node predicate, N
99106
//Filter function test failed so null result.
100107
return QueryIterNullIterator.create(execCxt);
101108
}
102-
103109
}
104110

105111
private QueryIterator bothUnbound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {
106-
107-
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
108112
Var subjectVar = Var.alloc(subject.getName());
109113

110114
Graph graph = execCxt.getActiveGraph();
111115

112116
//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
113-
ExtendedIterator<Triple> subjectTriples;
114-
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
115-
subjectTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
116-
} else if (graph.contains(null, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(null, RDF.type.asNode(), Geo.GEOMETRY_NODE)) {
117-
ExtendedIterator<Triple> featureTriples = graph.find(null, RDF.type.asNode(), Geo.FEATURE_NODE);
118-
ExtendedIterator<Triple> geometryTriples = graph.find(null, RDF.type.asNode(), Geo.GEOMETRY_NODE);
119-
subjectTriples = featureTriples.andThen(geometryTriples);
120-
} else {
121-
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
122-
subjectTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
123-
}
124-
125-
//Bind all the Spatial Objects or Geo Predicates once as the subject and search for corresponding Objects.
126-
while (subjectTriples.hasNext()) {
127-
Triple subjectTriple = subjectTriples.next();
128-
Node boundSubject = subjectTriple.getSubject();
129-
Binding subjectBind = BindingFactory.binding(binding, subjectVar, boundSubject);
130-
QueryIterator queryIter = oneBound(subjectBind, boundSubject, predicate, object, execCxt);
131-
queryIterConcat.add(queryIter);
132-
}
133-
134-
return queryIterConcat;
117+
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);
118+
ExtendedIterator<Binding> iterator = spatialTriples
119+
.mapWith(Triple::getSubject)
120+
.mapWith(node -> BindingFactory.binding(binding, subjectVar, node));
121+
122+
QueryIter queryIter = QueryIter.flatMap(
123+
QueryIterPlainWrapper.create(iterator, execCxt),
124+
b -> oneBound(b, b.get(subjectVar), predicate, object, execCxt),
125+
execCxt
126+
);
127+
return queryIter;
135128
}
136129

137130
private QueryIterator oneBound(Binding binding, Node subject, Node predicate, Node object, ExecutionContext execCxt) {
138131

139132
Graph graph = execCxt.getActiveGraph();
140133
Node boundNode;
141134
Node unboundNode;
142-
Boolean isSubjectBound;
135+
boolean isSubjectBound;
143136
if (subject.isConcrete()) {
144137
//Subject is bound, object is unbound.
145138
boundNode = subject;
@@ -152,33 +145,51 @@ private QueryIterator oneBound(Binding binding, Node subject, Node predicate, No
152145
isSubjectBound = false;
153146
}
154147

155-
if (!(boundNode.isLiteral() || graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) || graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
148+
if (!(boundNode.isLiteral() ||
149+
graph.contains(boundNode, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE) ||
150+
graph.contains(boundNode, RDF.type.asNode(), Geo.FEATURE_NODE) ||
151+
graph.contains(boundNode, RDF.type.asNode(), Geo.GEOMETRY_NODE))) {
156152
if (!graph.contains(boundNode, SpatialExtension.GEO_LAT_NODE, null)) {
157153
//Bound node is not a Feature or a Geometry or has Geo predicates so exit.
158154
return QueryIterNullIterator.create(execCxt);
159155
}
160156
}
161157

162158
boolean isSpatialIndex = SpatialIndex.isDefined(execCxt);
163-
QueryIterConcat queryIterConcat;
159+
QueryIterator result;
164160
if (!isSpatialIndex || filterFunction.isDisjoint() || filterFunction.isDisconnected()) {
165161
//Disjointed so retrieve all cases.
166-
queryIterConcat = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
162+
result = findAll(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
167163
} else {
168164
//Only retrieve those in the spatial index which are within same bounding box.
169-
queryIterConcat = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
165+
result = findIndex(graph, boundNode, unboundNode, binding, isSubjectBound, predicate, execCxt);
170166
}
171-
172-
return queryIterConcat;
167+
return result;
173168
}
174169

175-
private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {
170+
private QueryIterator findAll(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) {
176171

177172
//Prepare the results.
178173
Var unboundVar = Var.alloc(unboundNode.getName());
179-
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
180174

181175
//Search for both Features and Geometry in the Graph. Reliant upon consistent usage of SpatialObject (which is base class of Feature and Geometry) if present.
176+
ExtendedIterator<Triple> spatialTriples = findSpatialTriples(graph);
177+
178+
ExtendedIterator<Binding> iterator = spatialTriples
179+
.mapWith(Triple::getSubject)
180+
.mapWith(node -> BindingFactory.binding(binding, unboundVar, node));
181+
182+
return QueryIter.flatMap(
183+
QueryIterPlainWrapper.create(iterator, execCxt),
184+
b -> {
185+
Node spatialNode = b.get(unboundVar);
186+
QueryIterator iter = bothBound(b, isSubjectBound, boundNode, predicate, spatialNode, execCxt);
187+
return iter;
188+
},
189+
execCxt);
190+
}
191+
192+
private static ExtendedIterator<Triple> findSpatialTriples(Graph graph) {
182193
ExtendedIterator<Triple> spatialTriples;
183194
if (graph.contains(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE)) {
184195
spatialTriples = graph.find(null, RDF.type.asNode(), Geo.SPATIAL_OBJECT_NODE);
@@ -190,45 +201,33 @@ private QueryIterConcat findAll(Graph graph, Node boundNode, Node unboundNode, B
190201
//Check for Geo Predicate Features in the Graph if no GeometryLiterals found.
191202
spatialTriples = graph.find(null, SpatialExtension.GEO_LAT_NODE, null);
192203
}
193-
194-
while (spatialTriples.hasNext()) {
195-
Triple spatialTriple = spatialTriples.next();
196-
Node spatialNode = spatialTriple.getSubject();
197-
Binding newBind = BindingFactory.binding(binding, unboundVar, spatialNode);
198-
QueryIterator queryIter;
199-
if (isSubjectBound) {
200-
queryIter = bothBound(newBind, boundNode, predicate, spatialNode, execCxt);
201-
} else {
202-
queryIter = bothBound(newBind, spatialNode, predicate, boundNode, execCxt);
203-
}
204-
queryIterConcat.add(queryIter);
205-
}
206-
207-
return queryIterConcat;
204+
return spatialTriples;
208205
}
209206

210-
private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {
211-
207+
private QueryIterator findIndex(Graph graph, Node boundNode, Node unboundNode, Binding binding, boolean isSubjectBound, Node predicate, ExecutionContext execCxt) throws ExprEvalException {
212208
try {
213209
//Prepare for results.
214-
Var unboundVar = Var.alloc(unboundNode.getName());
215-
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
210+
Var unboundVar = Var.alloc(unboundNode);
216211

217212
//Find the asserted triples.
218-
List<Node> assertedNodes = !isSubjectBound || !boundNode.isLiteral() ? findAsserted(graph, boundNode, isSubjectBound, predicate) : Collections.emptyList();
219-
for (Node node : assertedNodes) {
220-
Binding newBind = BindingFactory.binding(binding, unboundVar, node);
221-
QueryIterator queryIter = QueryIterSingleton.create(newBind, execCxt);
222-
queryIterConcat.add(queryIter);
223-
}
213+
Collection<Node> assertedNodes = !isSubjectBound || !boundNode.isLiteral()
214+
? findAsserted(graph, boundNode, isSubjectBound, predicate)
215+
: List.of();
216+
217+
QueryIterator assertedNodesIter = QueryIterPlainWrapper.create(
218+
Iter.map(assertedNodes.iterator(), node -> BindingFactory.binding(binding, unboundVar, node)),
219+
execCxt);
224220

225221
//Find the GeometryLiteral of the Bound Node.
226222
SpatialObjectGeometryLiteral boundGeometryLiteral = SpatialObjectGeometryLiteral.retrieve(graph, boundNode);
227223
if (!boundGeometryLiteral.isValid()) {
228224
//Bound Node is not a Feature or a Geometry or there is no GeometryLiteral so exit.
229-
return queryIterConcat;
225+
return assertedNodesIter;
230226
}
231227

228+
QueryIterConcat queryIterConcat = new QueryIterConcat(execCxt);
229+
queryIterConcat.add(assertedNodesIter);
230+
232231
Node geometryLiteral = boundGeometryLiteral.getGeometryLiteral();
233232

234233
//Perform the search of the Spatial Index of the Dataset.
@@ -238,55 +237,66 @@ private QueryIterConcat findIndex(Graph graph, Node boundNode, Node unboundNode,
238237
Envelope searchEnvelope = transformedGeom.getEnvelope();
239238
HashSet<Resource> features = spatialIndex.query(searchEnvelope);
240239

241-
//Check each of the Features that match the search.
242-
for (Resource feature : features) {
243-
Node featureNode = feature.asNode();
244-
245-
//Ensure not already an asserted node.
246-
if (!assertedNodes.contains(featureNode)) {
247-
248-
Binding newBind = BindingFactory.binding(binding, unboundVar, featureNode);
249-
QueryIterator queryIter;
250-
if (isSubjectBound) {
251-
queryIter = bothBound(newBind, boundNode, predicate, featureNode, execCxt);
252-
} else {
253-
queryIter = bothBound(newBind, featureNode, predicate, boundNode, execCxt);
254-
}
255-
queryIterConcat.add(queryIter);
256-
}
257-
258-
//Also test all Geometry of the Features. All, some or one Geometry may have matched.
259-
List<Node> featureGeometryTriples = G.listSP(graph, feature.asNode(), Geo.HAS_GEOMETRY_NODE);
260-
for ( Node geomNode : featureGeometryTriples) {
261-
//Ensure not already an asserted node.
262-
if (!assertedNodes.contains(geomNode)) {
263-
Binding newBind = BindingFactory.binding(binding, unboundVar, geomNode);
264-
QueryIterator queryIter;
265-
if (isSubjectBound) {
266-
queryIter = bothBound(newBind, boundNode, predicate, geomNode, execCxt);
267-
} else {
268-
queryIter = bothBound(newBind, geomNode, predicate, boundNode, execCxt);
269-
}
270-
queryIterConcat.add(queryIter);
271-
}
272-
}
273-
}
240+
// Check each of the Features that match the search.
241+
QueryIterator featuresIter = QueryIterPlainWrapper.create(
242+
Iter.map(features.iterator(), feature -> BindingFactory.binding(binding, unboundVar, feature.asNode())),
243+
execCxt);
244+
245+
QueryIterator queryIterator = QueryIter.flatMap(featuresIter,
246+
featureBinding -> {
247+
return findByFeature(graph, binding, featureBinding,
248+
isSubjectBound, boundNode, predicate, unboundVar,
249+
execCxt, assertedNodes);
250+
},
251+
execCxt);
252+
queryIterConcat.add(queryIterator);
274253

275254
return queryIterConcat;
276255
} catch (MismatchedDimensionException | TransformException | FactoryException | SpatialIndexException ex) {
277256
throw new ExprEvalException(ex.getMessage() + ": " + FmtUtils.stringForNode(boundNode) + ", " + FmtUtils.stringForNode(unboundNode) + ", " + FmtUtils.stringForNode(predicate), ex);
278257
}
279258
}
280259

281-
private List<Node> findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) {
282-
List<Node> assertedNodes = new ArrayList<>();
283-
if (isSubjectBound) {
284-
List<Node> x = G.listSP(graph, boundNode, predicate);
285-
assertedNodes.addAll(x);
286-
} else {
287-
List<Node> x = G.listPO(graph, predicate, boundNode);
288-
assertedNodes.addAll(x);
260+
private QueryIterator findByFeature(Graph graph, Binding binding, Binding featureBinding,
261+
boolean isSubjectBound, Node boundNode, Node predicate, Var unboundVar,
262+
ExecutionContext execCxt, Collection<Node> assertedNodes) {
263+
264+
Node featureNode = featureBinding.get(unboundVar);
265+
QueryIterConcat featureIterConcat = new QueryIterConcat(execCxt);
266+
267+
// Check Features directly if not already asserted
268+
if (!assertedNodes.contains(featureNode)) {
269+
QueryIterator tmpIter = bothBound(featureBinding, isSubjectBound, boundNode, predicate, featureNode, execCxt);
270+
featureIterConcat.add(tmpIter);
289271
}
272+
273+
// Also test all Geometry of the Features. All, some or one Geometry may have matched.
274+
ExtendedIterator<Node> featureGeometries = G.iterSP(graph, featureNode, Geo.HAS_GEOMETRY_NODE);
275+
QueryIterator geometriesQueryIterator = QueryIterPlainWrapper.create(
276+
Iter.map(
277+
Iter.filter( // omit asserted
278+
featureGeometries,
279+
geometry -> !assertedNodes.contains(geometry)
280+
),
281+
geometryNode -> BindingFactory.binding(binding, unboundVar, geometryNode)),
282+
execCxt);
283+
284+
geometriesQueryIterator = QueryIter.flatMap(
285+
geometriesQueryIterator,
286+
b2 -> {
287+
Node geomNode = b2.get(unboundVar);
288+
return bothBound(b2, isSubjectBound, boundNode, predicate, geomNode, execCxt);
289+
},
290+
execCxt);
291+
292+
featureIterConcat.add(geometriesQueryIterator);
293+
return featureIterConcat;
294+
}
295+
296+
private List<Node> findAsserted(Graph graph, Node boundNode, boolean isSubjectBound, Node predicate) {
297+
List<Node> assertedNodes = isSubjectBound
298+
? G.listSP(graph, boundNode, predicate)
299+
: G.listPO(graph, predicate, boundNode);
290300
return assertedNodes;
291301
}
292302

@@ -323,5 +333,4 @@ protected final Boolean queryRewrite(Graph graph, Node subject, Node predicate,
323333
public Boolean testFilterFunction(Node subjectGeometryLiteral, Node objectGeometryLiteral) {
324334
return filterFunction.exec(subjectGeometryLiteral, objectGeometryLiteral);
325335
}
326-
327336
}

0 commit comments

Comments
 (0)