Skip to content

Commit dd156b9

Browse files
authored
Enable use of search_after with field_collapse.
Original Pull Request #2937 Closes #2935
1 parent 8d0ecf2 commit dd156b9

File tree

3 files changed

+70
-4
lines changed

3 files changed

+70
-4
lines changed

src/main/java/org/springframework/data/elasticsearch/client/elc/ReactiveElasticsearchTemplate.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,28 @@ private Flux<SearchDocument> doFindUnbounded(Query query, Class<?> clazz, IndexC
395395
Function<PitSearchAfter, Publisher<? extends ResponseBody<EntityAsMap>>> resourceClosure = psa -> {
396396

397397
baseQuery.setPointInTime(new Query.PointInTime(psa.getPit(), pitKeepAlive));
398-
baseQuery.addSort(Sort.by("_shard_doc"));
398+
399+
// only add _shard_doc if there is not a field_collapse and a sort with the same name
400+
boolean addShardDoc = true;
401+
402+
if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) {
403+
var field = nativeQuery.getFieldCollapse().field();
404+
405+
if (nativeQuery.getSortOptions().stream()
406+
.anyMatch(sortOptions -> sortOptions.isField() && sortOptions.field().field().equals(field))) {
407+
addShardDoc = false;
408+
}
409+
410+
if (query.getSort() != null
411+
&& query.getSort().stream().anyMatch(order -> order.getProperty().equals(field))) {
412+
addShardDoc = false;
413+
}
414+
}
415+
416+
if (addShardDoc) {
417+
baseQuery.addSort(Sort.by("_shard_doc"));
418+
}
419+
399420
SearchRequest firstSearchRequest = requestConverter.searchRequest(baseQuery, routingResolver.getRouting(),
400421
clazz, index, false, true);
401422

src/main/java/org/springframework/data/elasticsearch/client/elc/RequestConverter.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -1487,8 +1487,8 @@ private <T> void prepareSearchRequest(Query query, @Nullable String routing, @Nu
14871487
if (query instanceof NativeQuery nativeQuery) {
14881488
prepareNativeSearch(nativeQuery, builder);
14891489
}
1490-
// query.getSort() must be checked after prepareNativeSearch as this already might hav a sort set that must have
1491-
// higher priority
1490+
// query.getSort() must be checked after prepareNativeSearch as this already might have a sort set
1491+
// that must have higher priority
14921492
if (query.getSort() != null) {
14931493
List<SortOptions> sortOptions = getSortOptions(query.getSort(), persistentEntity);
14941494

@@ -1510,7 +1510,15 @@ private <T> void prepareSearchRequest(Query query, @Nullable String routing, @Nu
15101510
}
15111511

15121512
if (!isEmpty(query.getSearchAfter())) {
1513-
builder.searchAfter(query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList());
1513+
var fieldValues = query.getSearchAfter().stream().map(TypeUtils::toFieldValue).toList();
1514+
1515+
// when there is a field collapse on a native query, and we have a search_after, then the search_after
1516+
// must only have one entry
1517+
if (query instanceof NativeQuery nativeQuery && nativeQuery.getFieldCollapse() != null) {
1518+
builder.searchAfter(fieldValues.get(0));
1519+
} else {
1520+
builder.searchAfter(fieldValues);
1521+
}
15141522
}
15151523

15161524
query.getRescorerQueries().forEach(rescorerQuery -> builder.rescore(getRescore(rescorerQuery)));

src/test/java/org/springframework/data/elasticsearch/repository/support/SimpleReactiveElasticsearchRepositoryELCIntegrationTests.java

+37
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,22 @@
1515
*/
1616
package org.springframework.data.elasticsearch.repository.support;
1717

18+
import co.elastic.clients.elasticsearch.core.search.FieldCollapse;
19+
import org.junit.jupiter.api.DisplayName;
20+
import org.junit.jupiter.api.Test;
1821
import org.springframework.context.annotation.Bean;
1922
import org.springframework.context.annotation.Configuration;
2023
import org.springframework.context.annotation.Import;
24+
import org.springframework.data.domain.Pageable;
25+
import org.springframework.data.domain.Sort;
26+
import org.springframework.data.elasticsearch.client.elc.NativeQuery;
27+
import org.springframework.data.elasticsearch.client.elc.Queries;
2128
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchTemplateConfiguration;
2229
import org.springframework.data.elasticsearch.repositories.custommethod.QueryParameter;
2330
import org.springframework.data.elasticsearch.repository.config.EnableReactiveElasticsearchRepositories;
2431
import org.springframework.data.elasticsearch.utils.IndexNameProvider;
2532
import org.springframework.test.context.ContextConfiguration;
33+
import reactor.test.StepVerifier;
2634

2735
/**
2836
* @author Peter-Josef Meisch
@@ -51,4 +59,33 @@ QueryParameter queryParameter() {
5159
}
5260
}
5361

62+
/**
63+
* search_after is used by the reactive search operation, it normally always adds _shard_doc as a tiebreaker sort
64+
* parameter. This must not be done when a collapse field is used as sort field, as in that case the collapse field
65+
* must be the only sort field.
66+
*/
67+
@Test // #2935
68+
@DisplayName("should use collapse_field for search_after in pit search")
69+
void shouldUseCollapseFieldForSearchAfterI() {
70+
var entity = new SampleEntity();
71+
entity.setId("42");
72+
entity.setMessage("m");
73+
entity.setKeyword("kw");
74+
repository.save(entity).block();
75+
76+
var query = NativeQuery.builder()
77+
.withQuery(Queries.matchAllQueryAsQuery())
78+
.withPageable(Pageable.unpaged())
79+
.withFieldCollapse(FieldCollapse.of(fcb -> fcb
80+
.field("keyword")))
81+
.withSort(Sort.by("keyword"))
82+
.build();
83+
84+
operations.search(query, SampleEntity.class)
85+
.as(StepVerifier::create)
86+
.expectNextCount(1)
87+
.verifyComplete();
88+
}
89+
90+
5491
}

0 commit comments

Comments
 (0)