Skip to content

Commit e471d09

Browse files
committed
Merge branch '1.4.x'
2 parents 092bd01 + d7779ac commit e471d09

File tree

5 files changed

+38
-44
lines changed

5 files changed

+38
-44
lines changed

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextDataFetcherDecorator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ private ContextDataFetcherDecorator(
131131
value = ReactiveAdapterRegistryHelper.toMonoIfReactive(value);
132132

133133
if (value instanceof Mono<?> mono) {
134-
value = ContextPropagationHelper.bindCancelFrom(mono, graphQlContext).contextWrite(snapshot::updateContext).toFuture();
134+
value = mono.contextWrite(snapshot::updateContext).toFuture();
135135
}
136136

137137
return value;

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextPropagationHelper.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.graphql.execution;
1818

19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
1921
import graphql.GraphQLContext;
2022
import io.micrometer.context.ContextSnapshot;
2123
import io.micrometer.context.ContextSnapshotFactory;
@@ -39,7 +41,9 @@ public abstract class ContextPropagationHelper {
3941

4042
private static final String CONTEXT_SNAPSHOT_FACTORY_KEY = ContextPropagationHelper.class.getName() + ".KEY";
4143

42-
private static final String CANCEL_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".cancelled";
44+
private static final String CANCELED_KEY = ContextPropagationHelper.class.getName() + ".canceled";
45+
46+
private static final String CANCELED_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".canceledPublisher";
4347

4448

4549
/**
@@ -119,50 +123,37 @@ public static ContextSnapshot captureFrom(GraphQLContext context) {
119123
}
120124

121125
/**
122-
* Create a publisher and store it into the given {@link GraphQLContext}.
123-
* This publisher can then be used to propagate cancel signals to upstream publishers.
126+
* Create an atomic boolean and store it into the given {@link GraphQLContext}.
127+
* This boolean value can then be checked by upstream publishers to know whether the request is canceled.
124128
* @param context the current GraphQL context
125-
* @since 1.3.5
129+
* @since 1.3.6
126130
*/
127-
public static Sinks.Empty<Void> createCancelPublisher(GraphQLContext context) {
128-
Sinks.Empty<Void> requestCancelled = Sinks.empty();
129-
context.put(CANCEL_PUBLISHER_KEY, requestCancelled.asMono());
130-
return requestCancelled;
131+
public static Runnable createCancelSignal(GraphQLContext context) {
132+
AtomicBoolean requestCancelled = new AtomicBoolean();
133+
Sinks.Empty<Void> cancelSignal = Sinks.empty();
134+
context.put(CANCELED_KEY, requestCancelled);
135+
context.put(CANCELED_PUBLISHER_KEY, cancelSignal.asMono());
136+
return () -> {
137+
requestCancelled.set(true);
138+
cancelSignal.tryEmitEmpty();
139+
};
131140
}
132141

133142
/**
134143
* Return {@code true} if the current request has been cancelled, {@code false} otherwise.
135-
* This checks whether a {@link #createCancelPublisher(GraphQLContext) cancellation publisher is present}
144+
* This checks whether a {@link #createCancelSignal(GraphQLContext) cancellation publisher is present}
136145
* in the given context and the cancel signal has fired already.
137146
* @param context the current GraphQL context
138147
* @since 1.4.0
139148
*/
140149
public static boolean isCancelled(GraphQLContext context) {
141-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
142-
if (cancelSignal != null) {
143-
return cancelSignal.toFuture().isDone();
150+
AtomicBoolean requestCancelled = context.get(CANCELED_KEY);
151+
if (requestCancelled != null) {
152+
return requestCancelled.get();
144153
}
145154
return false;
146155
}
147156

148-
/**
149-
* Bind the source {@link Mono} to the publisher from the given {@link GraphQLContext}.
150-
* The returned {@code Mono} will be cancelled when this publisher completes.
151-
* Subscribers must use the returned {@code Mono} instance.
152-
* @param source the source {@code Mono}
153-
* @param context the current GraphQL context
154-
* @param <T> the type of published elements
155-
* @return the new {@code Mono} that will be cancelled when notified
156-
* @since 1.3.5
157-
*/
158-
public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context) {
159-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
160-
if (cancelSignal != null) {
161-
return source.takeUntilOther(cancelSignal);
162-
}
163-
return source;
164-
}
165-
166157
/**
167158
* Bind the source {@link Flux} to the publisher from the given {@link GraphQLContext}.
168159
* The returned {@code Flux} will be cancelled when this publisher completes.
@@ -174,7 +165,7 @@ public static <T> Mono<T> bindCancelFrom(Mono<T> source, GraphQLContext context)
174165
* @since 1.3.5
175166
*/
176167
public static <T> Flux<T> bindCancelFrom(Flux<T> source, GraphQLContext context) {
177-
Mono<Void> cancelSignal = context.get(CANCEL_PUBLISHER_KEY);
168+
Mono<Void> cancelSignal = context.get(CANCELED_PUBLISHER_KEY);
178169
if (cancelSignal != null) {
179170
return source.takeUntilOther(cancelSignal);
180171
}

spring-graphql/src/main/java/org/springframework/graphql/execution/DefaultExecutionGraphQlService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.dataloader.DataLoaderRegistry;
3232
import org.jspecify.annotations.Nullable;
3333
import reactor.core.publisher.Mono;
34-
import reactor.core.publisher.Sinks;
3534

3635
import org.springframework.graphql.ExecutionGraphQlRequest;
3736
import org.springframework.graphql.ExecutionGraphQlResponse;
@@ -92,13 +91,13 @@ public final Mono<ExecutionGraphQlResponse> execute(ExecutionGraphQlRequest requ
9291
factory.captureFrom(contextView).updateContext(graphQLContext);
9392

9493
ExecutionInput executionInputToUse = registerDataLoaders(executionInput);
95-
Sinks.Empty<Void> cancelPublisher = ContextPropagationHelper.createCancelPublisher(graphQLContext);
94+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(graphQLContext);
9695

9796
return Mono.fromFuture(this.graphQlSource.graphQl().executeAsync(executionInputToUse))
9897
.onErrorResume((ex) -> ex instanceof GraphQLError, (ex) ->
9998
Mono.just(ExecutionResult.newExecutionResult().addError((GraphQLError) ex).build()))
10099
.map((result) -> new DefaultExecutionGraphQlResponse(executionInputToUse, result))
101-
.doOnCancel(cancelPublisher::tryEmitEmpty);
100+
.doOnCancel(cancelSignal::run);
102101
});
103102
}
104103

spring-graphql/src/test/java/org/springframework/graphql/execution/ContextDataFetcherDecoratorTests.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@
4242
import io.micrometer.context.ContextRegistry;
4343
import io.micrometer.context.ContextSnapshot;
4444
import io.micrometer.context.ContextSnapshotFactory;
45+
import org.junit.jupiter.api.Disabled;
4546
import org.junit.jupiter.api.Test;
4647
import reactor.core.publisher.Flux;
4748
import reactor.core.publisher.Mono;
48-
import reactor.core.publisher.Sinks;
4949
import reactor.test.StepVerifier;
5050

5151
import org.springframework.graphql.GraphQlSetup;
@@ -292,6 +292,7 @@ void trivialDataFetcherIsNotDecorated() {
292292
}
293293

294294
@Test
295+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
295296
void cancelMonoDataFetcherWhenRequestCancelled() {
296297
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
297298
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
@@ -303,14 +304,15 @@ void cancelMonoDataFetcherWhenRequestCancelled() {
303304
.toGraphQl();
304305

305306
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
306-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
307+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
307308

308309
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
309-
requestCancelled.tryEmitEmpty();
310+
cancelSignal.run();
310311
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
311312
}
312313

313314
@Test
315+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
314316
void cancelFluxDataFetcherWhenRequestCancelled() {
315317
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
316318
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
@@ -322,10 +324,10 @@ void cancelFluxDataFetcherWhenRequestCancelled() {
322324
.toGraphQl();
323325

324326
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
325-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
327+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
326328

327329
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
328-
requestCancelled.tryEmitEmpty();
330+
cancelSignal.run();
329331
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
330332
}
331333

@@ -336,8 +338,8 @@ void returnAbortExecutionForBlockingDataFetcherWhenRequestCancelled() throws Exc
336338
.toGraphQl();
337339

338340
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
339-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
340-
requestCancelled.tryEmitEmpty();
341+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
342+
cancelSignal.run();
341343
ExecutionResult result = graphQl.executeAsync(input).get();
342344

343345
assertThat(result.getErrors()).hasSize(1);
@@ -357,11 +359,11 @@ void cancelFluxDataFetcherSubscriptionWhenRequestCancelled() throws Exception {
357359
.toGraphQl();
358360

359361
ExecutionInput input = ExecutionInput.newExecutionInput().query("subscription { greetings }").build();
360-
Sinks.Empty<Void> requestCancelled = ContextPropagationHelper.createCancelPublisher(input.getGraphQLContext());
362+
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
361363

362364
ExecutionResult executionResult = graphQl.executeAsync(input).get();
363365
ResponseHelper.forSubscription(executionResult).subscribe();
364-
requestCancelled.tryEmitEmpty();
366+
cancelSignal.run();
365367

366368
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
367369
assertThat(dataFetcherCancelled).isTrue();

spring-graphql/src/test/java/org/springframework/graphql/execution/DefaultExecutionGraphQlServiceTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import graphql.ErrorType;
2424
import org.dataloader.DataLoaderRegistry;
25+
import org.junit.jupiter.api.Disabled;
2526
import org.junit.jupiter.api.Test;
2627
import reactor.core.publisher.Flux;
2728
import reactor.core.publisher.Mono;
@@ -82,6 +83,7 @@ void shouldHandleGraphQlErrors() {
8283
}
8384

8485
@Test
86+
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
8587
void cancellationSupport() {
8688
AtomicBoolean cancelled = new AtomicBoolean();
8789
Mono<String> greetingMono = Mono.just("hi")

0 commit comments

Comments
 (0)