Skip to content

Commit d561c91

Browse files
authored
Introduce cluster operations.
Original Pull Request: #1768 Closes #1390
1 parent 58bca88 commit d561c91

20 files changed

+842
-70
lines changed

src/main/java/org/springframework/data/elasticsearch/client/reactive/DefaultReactiveElasticsearchClient.java

+18-58
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
import org.apache.http.util.EntityUtils;
4646
import org.elasticsearch.ElasticsearchException;
4747
import org.elasticsearch.ElasticsearchStatusException;
48+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
49+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
4850
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
4951
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
5052
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -105,6 +107,7 @@
105107
import org.springframework.data.elasticsearch.client.ElasticsearchHost;
106108
import org.springframework.data.elasticsearch.client.NoReachableHostException;
107109
import org.springframework.data.elasticsearch.client.reactive.HostProvider.Verification;
110+
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Cluster;
108111
import org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient.Indices;
109112
import org.springframework.data.elasticsearch.client.util.NamedXContents;
110113
import org.springframework.data.elasticsearch.client.util.ScrollState;
@@ -142,7 +145,7 @@
142145
* @see ClientConfiguration
143146
* @see ReactiveRestClients
144147
*/
145-
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices {
148+
public class DefaultReactiveElasticsearchClient implements ReactiveElasticsearchClient, Indices, Cluster {
146149

147150
private final HostProvider<?> hostProvider;
148151
private final RequestCreator requestCreator;
@@ -297,10 +300,6 @@ public void setHeadersSupplier(Supplier<HttpHeaders> headersSupplier) {
297300
this.headersSupplier = headersSupplier;
298301
}
299302

300-
/*
301-
* (non-Javadoc)
302-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders)
303-
*/
304303
@Override
305304
public Mono<Boolean> ping(HttpHeaders headers) {
306305

@@ -309,21 +308,13 @@ public Mono<Boolean> ping(HttpHeaders headers) {
309308
.onErrorResume(NoReachableHostException.class, error -> Mono.just(false)).next();
310309
}
311310

312-
/*
313-
* (non-Javadoc)
314-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#info(org.springframework.http.HttpHeaders)
315-
*/
316311
@Override
317312
public Mono<MainResponse> info(HttpHeaders headers) {
318313

319314
return sendRequest(new MainRequest(), requestCreator.info(), MainResponse.class, headers) //
320315
.next();
321316
}
322317

323-
/*
324-
* (non-Javadoc)
325-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#get(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
326-
*/
327318
@Override
328319
public Mono<GetResult> get(HttpHeaders headers, GetRequest getRequest) {
329320

@@ -341,10 +332,6 @@ public Flux<MultiGetItemResponse> multiGet(HttpHeaders headers, MultiGetRequest
341332
.flatMap(Flux::fromArray); //
342333
}
343334

344-
/*
345-
* (non-Javadoc)
346-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#exists(org.springframework.http.HttpHeaders, org.elasticsearch.action.get.GetRequest)
347-
*/
348335
@Override
349336
public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
350337

@@ -353,48 +340,33 @@ public Mono<Boolean> exists(HttpHeaders headers, GetRequest getRequest) {
353340
.next();
354341
}
355342

356-
/*
357-
* (non-Javadoc)
358-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.index.IndexRequest)
359-
*/
360343
@Override
361344
public Mono<IndexResponse> index(HttpHeaders headers, IndexRequest indexRequest) {
362345
return sendRequest(indexRequest, requestCreator.index(), IndexResponse.class, headers).next();
363346
}
364347

365-
/*
366-
* (non-Javadoc)
367-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#indices()
368-
*/
369348
@Override
370349
public Indices indices() {
371350
return this;
372351
}
373352

374-
/*
375-
* (non-Javadoc)
376-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.update.UpdateRequest)
377-
*/
353+
@Override
354+
public Cluster cluster() {
355+
return this;
356+
}
357+
378358
@Override
379359
public Mono<UpdateResponse> update(HttpHeaders headers, UpdateRequest updateRequest) {
380360
return sendRequest(updateRequest, requestCreator.update(), UpdateResponse.class, headers).next();
381361
}
382362

383-
/*
384-
* (non-Javadoc)
385-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.delete.DeleteRequest)
386-
*/
387363
@Override
388364
public Mono<DeleteResponse> delete(HttpHeaders headers, DeleteRequest deleteRequest) {
389365

390366
return sendRequest(deleteRequest, requestCreator.delete(), DeleteResponse.class, headers) //
391367
.next();
392368
}
393369

394-
/*
395-
* (non-Javadoc)
396-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#count(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
397-
*/
398370
@Override
399371
public Mono<Long> count(HttpHeaders headers, SearchRequest searchRequest) {
400372
searchRequest.source().trackTotalHits(true);
@@ -412,10 +384,6 @@ public Flux<SearchHit> searchTemplate(HttpHeaders headers, SearchTemplateRequest
412384
.map(response -> response.getResponse().getHits()).flatMap(Flux::fromIterable);
413385
}
414386

415-
/*
416-
* (non-Javadoc)
417-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
418-
*/
419387
@Override
420388
public Flux<SearchHit> search(HttpHeaders headers, SearchRequest searchRequest) {
421389

@@ -435,10 +403,6 @@ public Flux<Suggest> suggest(HttpHeaders headers, SearchRequest searchRequest) {
435403
.map(SearchResponse::getSuggest);
436404
}
437405

438-
/*
439-
* (non-Javadoc)
440-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#aggregate(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
441-
*/
442406
@Override
443407
public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequest) {
444408

@@ -453,10 +417,6 @@ public Flux<Aggregation> aggregate(HttpHeaders headers, SearchRequest searchRequ
453417
.flatMap(Flux::fromIterable);
454418
}
455419

456-
/*
457-
* (non-Javadoc)
458-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#scroll(org.springframework.http.HttpHeaders, org.elasticsearch.action.search.SearchRequest)
459-
*/
460420
@Override
461421
public Flux<SearchHit> scroll(HttpHeaders headers, SearchRequest searchRequest) {
462422

@@ -506,10 +466,6 @@ private Publisher<?> cleanupScroll(HttpHeaders headers, ScrollState state) {
506466
return sendRequest(clearScrollRequest, requestCreator.clearScroll(), ClearScrollResponse.class, headers);
507467
}
508468

509-
/*
510-
* (non-Javadoc)
511-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#ping(org.springframework.http.HttpHeaders, org.elasticsearch.index.reindex.DeleteByQueryRequest)
512-
*/
513469
@Override
514470
public Mono<BulkByScrollResponse> deleteBy(HttpHeaders headers, DeleteByQueryRequest deleteRequest) {
515471

@@ -524,10 +480,6 @@ public Mono<ByQueryResponse> updateBy(HttpHeaders headers, UpdateByQueryRequest
524480
.map(ByQueryResponse::of);
525481
}
526482

527-
/*
528-
* (non-Javadoc)
529-
* @see org.springframework.data.elasticsearch.client.reactive.ReactiveElasticsearchClient#bulk(org.springframework.http.HttpHeaders, org.elasticsearch.action.bulk.BulkRequest)
530-
*/
531483
@Override
532484
public Mono<BulkResponse> bulk(HttpHeaders headers, BulkRequest bulkRequest) {
533485
return sendRequest(bulkRequest, requestCreator.bulk(), BulkResponse.class, headers) //
@@ -812,6 +764,14 @@ public Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getI
812764

813765
// endregion
814766

767+
// region cluster operations
768+
@Override
769+
public Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest) {
770+
return sendRequest(clusterHealthRequest, requestCreator.clusterHealth(), ClusterHealthResponse.class, headers)
771+
.next();
772+
}
773+
// endregion
774+
815775
// region helper functions
816776
private <T> Publisher<? extends T> readResponseBody(String logId, Request request, ClientResponse response,
817777
Class<T> responseType) {
@@ -965,7 +925,7 @@ private static ElasticsearchException getElasticsearchException(String content,
965925
} while (token == XContentParser.Token.FIELD_NAME);
966926

967927
return null;
968-
} catch (IOException e) {
928+
} catch (Exception e) {
969929
return new ElasticsearchStatusException(content, status);
970930
}
971931
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/ReactiveElasticsearchClient.java

+51
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.Collection;
2323
import java.util.function.Consumer;
2424

25+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
26+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
2527
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
2628
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
2729
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -269,6 +271,14 @@ default Mono<IndexResponse> index(IndexRequest indexRequest) {
269271
*/
270272
Indices indices();
271273

274+
/**
275+
* Gain Access to cluster related commands.
276+
*
277+
* @return Cluster implementations
278+
* @since 4.2
279+
*/
280+
Cluster cluster();
281+
272282
/**
273283
* Execute an {@link UpdateRequest} against the {@literal update} API to alter a document.
274284
*
@@ -1678,4 +1688,45 @@ default Mono<GetIndexResponse> getIndex(GetIndexRequest getIndexRequest) {
16781688
*/
16791689
Mono<GetIndexResponse> getIndex(HttpHeaders headers, GetIndexRequest getIndexRequest);
16801690
}
1691+
1692+
/**
1693+
* Encapsulation of methods for accessing the Cluster API.
1694+
*
1695+
* @author Peter-Josef Meisch
1696+
* @since 4.2
1697+
*/
1698+
interface Cluster {
1699+
1700+
/**
1701+
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
1702+
*
1703+
* @param consumer never {@literal null}.
1704+
* @return Mono emitting the {@link ClusterHealthResponse}.
1705+
*/
1706+
default Mono<ClusterHealthResponse> health(Consumer<ClusterHealthRequest> consumer) {
1707+
1708+
ClusterHealthRequest clusterHealthRequest = new ClusterHealthRequest();
1709+
consumer.accept(clusterHealthRequest);
1710+
return health(clusterHealthRequest);
1711+
}
1712+
1713+
/**
1714+
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
1715+
*
1716+
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
1717+
* {@link ClusterHealthResponse}.
1718+
*/
1719+
default Mono<ClusterHealthResponse> health(ClusterHealthRequest clusterHealthRequest) {
1720+
return health(HttpHeaders.EMPTY, clusterHealthRequest);
1721+
}
1722+
1723+
/**
1724+
* Execute the given {{@link ClusterHealthRequest}} against the {@literal cluster} API.
1725+
*
1726+
* @param headers Use {@link HttpHeaders} to provide eg. authentication data. Must not be {@literal null}.
1727+
* @param clusterHealthRequest must not be {@literal null} // * @return Mono emitting the
1728+
* {@link ClusterHealthResponse}.
1729+
*/
1730+
Mono<ClusterHealthResponse> health(HttpHeaders headers, ClusterHealthRequest clusterHealthRequest);
1731+
}
16811732
}

src/main/java/org/springframework/data/elasticsearch/client/reactive/RequestCreator.java

+8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.util.function.Function;
55

6+
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
67
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
78
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
89
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
@@ -266,4 +267,11 @@ default Function<GetFieldMappingsRequest, Request> getFieldMapping() {
266267
default Function<GetIndexRequest, Request> getIndex() {
267268
return RequestConverters::getIndex;
268269
}
270+
271+
/**
272+
* @since 4.2
273+
*/
274+
default Function<ClusterHealthRequest, Request> clusterHealth() {
275+
return RequestConverters::clusterHealth;
276+
}
269277
}

src/main/java/org/springframework/data/elasticsearch/client/util/RequestConverters.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@
107107

108108
/**
109109
* <p>
110-
* Original implementation source {@link org.elasticsearch.client.RequestConverters} and
111-
* {@link org.elasticsearch.client.IndicesRequestConverters} by {@literal Elasticsearch}
110+
* Original implementation source {@link org.elasticsearch.client.RequestConverters},
111+
* {@link org.elasticsearch.client.IndicesRequestConverters} and
112+
* {@link org.elasticsearch.client.ClusterRequestConverters} by {@literal Elasticsearch}
112113
* (<a href="https://www.elastic.co">https://www.elastic.co</a>) licensed under the Apache License, Version 2.0.
113114
* </p>
114115
* Modified for usage with {@link ReactiveElasticsearchClient}.
@@ -1003,6 +1004,26 @@ public static Request getFieldMapping(GetFieldMappingsRequest getFieldMappingsRe
10031004
return request;
10041005
}
10051006

1007+
public static Request clusterHealth(ClusterHealthRequest healthRequest) {
1008+
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
1009+
String endpoint = new EndpointBuilder().addPathPartAsIs(new String[] { "_cluster/health" })
1010+
.addCommaSeparatedPathParts(indices).build();
1011+
1012+
Request request = new Request("GET", endpoint);
1013+
1014+
RequestConverters.Params parameters = new Params(request);
1015+
parameters.withWaitForStatus(healthRequest.waitForStatus());
1016+
parameters.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards());
1017+
parameters.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards());
1018+
parameters.withWaitForActiveShards(healthRequest.waitForActiveShards(), ActiveShardCount.NONE);
1019+
parameters.withWaitForNodes(healthRequest.waitForNodes());
1020+
parameters.withWaitForEvents(healthRequest.waitForEvents());
1021+
parameters.withTimeout(healthRequest.timeout());
1022+
parameters.withMasterTimeout(healthRequest.masterNodeTimeout());
1023+
parameters.withLocal(healthRequest.local()).withLevel(healthRequest.level());
1024+
return request;
1025+
}
1026+
10061027
static HttpEntity createEntity(ToXContent toXContent, XContentType xContentType) {
10071028

10081029
try {

src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java

+14-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.Objects;
1919

20+
import org.springframework.data.elasticsearch.core.cluster.ClusterOperations;
2021
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
2122
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
2223
import org.springframework.data.elasticsearch.core.routing.RoutingResolver;
@@ -39,25 +40,34 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
3940

4041
/**
4142
* get an {@link IndexOperations} that is bound to the given class
42-
*
43+
*
4344
* @return IndexOperations
4445
*/
4546
IndexOperations indexOps(Class<?> clazz);
4647

4748
/**
4849
* get an {@link IndexOperations} that is bound to the given index
49-
*
50+
*
5051
* @return IndexOperations
5152
*/
5253
IndexOperations indexOps(IndexCoordinates index);
5354

55+
/**
56+
* return a {@link ClusterOperations} instance that uses the same client communication setup as this
57+
* ElasticsearchOperations instance.
58+
*
59+
* @return ClusterOperations implementation
60+
* @since 4.2
61+
*/
62+
ClusterOperations cluster();
63+
5464
ElasticsearchConverter getElasticsearchConverter();
5565

5666
IndexCoordinates getIndexCoordinatesFor(Class<?> clazz);
5767

5868
/**
5969
* gets the routing for an entity which might be defined by a join-type relation
60-
*
70+
*
6171
* @param entity the entity
6272
* @return the routing, may be null if not set.
6373
* @since 4.1
@@ -68,7 +78,7 @@ public interface ElasticsearchOperations extends DocumentOperations, SearchOpera
6878
// region helper
6979
/**
7080
* gets the String representation for an id.
71-
*
81+
*
7282
* @param id
7383
* @return
7484
* @since 4.0

0 commit comments

Comments
 (0)