Skip to content

Commit 788c1e0

Browse files
committed
Fix high memory usage in ContextPropagationHelper
Prior to this commit, gh-1149 added support for cancellation detection at the transport level and the propagation of the CANCEL signal to data fetcher `Publisher`. `ContextPropagationHelper` stores in the context a `Sink` that emits the cancel signal. All upstream publishers are then subscribing to this and canceling themselves if they receive a signal. In the case of high field count queries + high request/sec services, this would create a significant memory overhead. This commit reduces the support to the essential parts: * avoid further calls to data fetchers if the request is canceled * cancel subscription publishers This means other in-flight data fetchers that are publisher-based are not canceled anymore. This should be a good compromise for high RPS services, until this feature is natively supported in graphql-java. See gh-1246
1 parent 2b41166 commit 788c1e0

File tree

5 files changed

+37
-41
lines changed

5 files changed

+37
-41
lines changed

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextDataFetcherDecorator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public Object get(DataFetchingEnvironment env) throws Exception {
121121
value = ReactiveAdapterRegistryHelper.toMonoIfReactive(value);
122122

123123
if (value instanceof Mono<?> mono) {
124-
value = ContextPropagationHelper.bindCancelFrom(mono, graphQlContext).contextWrite(snapshot::updateContext).toFuture();
124+
value = mono.contextWrite(snapshot::updateContext).toFuture();
125125
}
126126

127127
return value;

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextPropagationHelper.java

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.graphql.execution;
1818

19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
1921
import graphql.GraphQLContext;
2022
import io.micrometer.context.ContextSnapshot;
2123
import io.micrometer.context.ContextSnapshotFactory;
@@ -40,7 +42,9 @@ public abstract class ContextPropagationHelper {
4042

4143
private static final String CONTEXT_SNAPSHOT_FACTORY_KEY = ContextPropagationHelper.class.getName() + ".KEY";
4244

43-
private static final String CANCEL_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".cancelled";
45+
private static final String CANCELED_KEY = ContextPropagationHelper.class.getName() + ".canceled";
46+
47+
private static final String CANCELED_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".canceledPublisher";
4448

4549

4650
/**
@@ -120,33 +124,20 @@ public static ContextSnapshot captureFrom(GraphQLContext context) {
120124
}
121125

122126
/**
123-
* Create a publisher and store it into the given {@link GraphQLContext}.
124-
* This publisher can then be used to propagate cancel signals to upstream publishers.
127+
* Create an atomic boolean and store it into the given {@link GraphQLContext}.
128+
* This boolean value can then be checked by upstream publishers to know whether the request is canceled.
125129
* @param context the current GraphQL context
126-
* @since 1.3.5
130+
* @since 1.3.6
127131
*/
128-
public static Sinks.Empty<Void> createCancelPublisher(GraphQLContext context) {
129-
Sinks.Empty<Void> requestCancelled = Sinks.empty();
130-
context.put(CANCEL_PUBLISHER_KEY, requestCancelled.asMono());
131-
return requestCancelled;
132-
}
133-
134-
/**
135-
* Bind the source {@link Mono} to the publisher from the given {@link GraphQLContext}.
136-
* The returned {@code Mono} will be cancelled when this publisher completes.
137-
* Subscribers must use the returned {@code Mono} instance.
138-
* @param source the source {@code Mono}
139-
* @param context the current GraphQL context
140-
* @param <T> the type of published elements
141-
* @return the new {@code Mono} that will be cancelled when notified
142-
* @since 1.3.5
143-
*/
144-
public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context) {
145-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
146-
if (cancelSignal != null) {
147-
return source.takeUntilOther(cancelSignal);
148-
}
149-
return source;
132+
public static Runnable createCancelSignal(GraphQLContext context) {
133+
AtomicBoolean requestCancelled = new AtomicBoolean();
134+
Sinks.Empty<Void> cancelSignal = Sinks.empty();
135+
context.put(CANCELED_KEY, requestCancelled);
136+
context.put(CANCELED_PUBLISHER_KEY, cancelSignal.asMono());
137+
return () -> {
138+
requestCancelled.set(true);
139+
cancelSignal.tryEmitEmpty();
140+
};
150141
}
151142

152143
/**
@@ -160,7 +151,7 @@ public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context)
160151
* @since 1.3.5
161152
*/
162153
public static <T> Flux<T> bindCancelFrom(Flux<T> source, GraphQLContext context) {
163-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
154+
Mono<Void> cancelSignal = context.get(CANCELED_PUBLISHER_KEY);
164155
if (cancelSignal != null) {
165156
return source.takeUntilOther(cancelSignal);
166157
}

spring-graphql/src/main/java/org/springframework/graphql/execution/DefaultExecutionGraphQlService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import io.micrometer.context.ContextSnapshotFactory;
3232
import org.dataloader.DataLoaderRegistry;
3333
import reactor.core.publisher.Mono;
34-
import reactor.core.publisher.Sinks;
3534

3635
import org.springframework.graphql.ExecutionGraphQlRequest;
3736
import org.springframework.graphql.ExecutionGraphQlResponse;
@@ -103,13 +102,13 @@ public final Mono<ExecutionGraphQlResponse> execute(ExecutionGraphQlRequest requ
103102
factory.captureFrom(contextView).updateContext(graphQLContext);
104103

105104
ExecutionInput executionInputToUse = registerDataLoaders(executionInput);
106-
Sinks.Empty<Void> cancelPublisher = ContextPropagationHelper.createCancelPublisher(graphQLContext);
105+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(graphQLContext);
107106

108107
return Mono.fromFuture(this.graphQlSource.graphQl().executeAsync(executionInputToUse))
109108
.onErrorResume((ex) -> ex instanceof GraphQLError, (ex) ->
110109
Mono.just(ExecutionResult.newExecutionResult().addError((GraphQLError) ex).build()))
111110
.map((result) -> new DefaultExecutionGraphQlResponse(executionInputToUse, result))
112-
.doOnCancel(cancelPublisher::tryEmitEmpty);
111+
.doOnCancel(cancelSignal::run);
113112
});
114113
}
115114

spring-graphql/src/test/java/org/springframework/graphql/execution/ContextDataFetcherDecoratorTests.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@
4141
import io.micrometer.context.ContextRegistry;
4242
import io.micrometer.context.ContextSnapshot;
4343
import io.micrometer.context.ContextSnapshotFactory;
44+
import org.junit.jupiter.api.Disabled;
4445
import org.junit.jupiter.api.Test;
4546
import reactor.core.publisher.Flux;
4647
import reactor.core.publisher.Mono;
47-
import reactor.core.publisher.Sinks;
4848
import reactor.test.StepVerifier;
4949

5050
import org.springframework.graphql.GraphQlSetup;
@@ -56,10 +56,12 @@
5656

5757
/**
5858
* Tests for {@link ContextDataFetcherDecorator}.
59+
*
5960
* @author Rossen Stoyanchev
61+
* @author Brian Clozel
6062
*/
6163
@SuppressWarnings("ReactiveStreamsUnusedPublisher")
62-
public class ContextDataFetcherDecoratorTests {
64+
class ContextDataFetcherDecoratorTests {
6365

6466
private static final String SCHEMA_CONTENT = """
6567
directive @UpperCase on FIELD_DEFINITION \
@@ -289,7 +291,8 @@ void trivialDataFetcherIsNotDecorated() {
289291
}
290292

291293
@Test
292-
void cancelMonoDataFetcherWhenRequestCancelled() throws Exception {
294+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
295+
void cancelMonoDataFetcherWhenRequestCancelled() {
293296
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
294297
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
295298
.queryFetcher("greeting", (env) ->
@@ -300,15 +303,16 @@ void cancelMonoDataFetcherWhenRequestCancelled() throws Exception {
300303
.toGraphQl();
301304

302305
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
303-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
306+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
304307

305308
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
306-
requestCancelled.tryEmitEmpty();
309+
cancelSignal.run();
307310
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
308311
}
309312

310313
@Test
311-
void cancelFluxDataFetcherWhenRequestCancelled() throws Exception {
314+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
315+
void cancelFluxDataFetcherWhenRequestCancelled() {
312316
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
313317
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
314318
.queryFetcher("greeting", (env) ->
@@ -319,10 +323,10 @@ void cancelFluxDataFetcherWhenRequestCancelled() throws Exception {
319323
.toGraphQl();
320324

321325
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
322-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
326+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
323327

324328
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
325-
requestCancelled.tryEmitEmpty();
329+
cancelSignal.run();
326330
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
327331
}
328332

@@ -338,11 +342,11 @@ void cancelFluxDataFetcherSubscriptionWhenRequestCancelled() throws Exception {
338342
.toGraphQl();
339343

340344
ExecutionInput input = ExecutionInput.newExecutionInput().query("subscription { greetings }").build();
341-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
345+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
342346

343347
ExecutionResult executionResult = graphQl.executeAsync(input).get();
344348
ResponseHelper.forSubscription(executionResult).subscribe();
345-
requestCancelled.tryEmitEmpty();
349+
cancelSignal.run();
346350

347351
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
348352
assertThat(dataFetcherCancelled).isTrue();

spring-graphql/src/test/java/org/springframework/graphql/execution/DefaultExecutionGraphQlServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import graphql.ErrorType;
2424
import org.dataloader.DataLoaderRegistry;
25+
import org.junit.jupiter.api.Disabled;
2526
import org.junit.jupiter.api.Test;
2627
import reactor.core.publisher.Flux;
2728
import reactor.core.publisher.Mono;
@@ -82,6 +83,7 @@ void shouldHandleGraphQlErrors() {
8283
}
8384

8485
@Test
86+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
8587
void cancellationSupport() {
8688
AtomicBoolean cancelled = new AtomicBoolean();
8789
Mono<String> greetingMono = Mono.just("hi")

0 commit comments

Comments
 (0)