Skip to content

Fix error propagation in AbstractReactiveElasticsearchTemplate:save()#3232

Open
noel1155 wants to merge 4 commits intospring-projects:mainfrom
noel1155:main
Open

Fix error propagation in AbstractReactiveElasticsearchTemplate:save()#3232
noel1155 wants to merge 4 commits intospring-projects:mainfrom
noel1155:main

Conversation

@noel1155
Copy link

@noel1155 noel1155 commented Jan 27, 2026

  • You have read the Spring Data contribution guidelines.
  • There is a ticket in the bug tracker for the project in our issue tracker. Add the issue number to the Closes Reactive saveAll errors are swallowed in AbstractReactiveElasticsearchTemplate #3233 line below
  • You use the code formatters provided here and have them applied to your changes. Don’t submit any formatting related changes.
  • You submit test cases (unit or integration tests) that back your changes.
  • You added yourself as author in the headers of the classes you touched. Amend the date range in the Apache license header if needed. For new types, add the license header (copy from another file and set the current year only).

Closes #3233

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Jan 27, 2026
@noel1155 noel1155 force-pushed the main branch 2 times, most recently from 0defeb0 to 21f1a9e Compare January 27, 2026 23:44
@sothawo
Copy link
Collaborator

sothawo commented Jan 30, 2026

Thanks for that PR.
I checked that out and ran the test, but it fails:


java.lang.AssertionError: expectation "expectNextCount(2)" failed (expected: count = 2; actual: counted = 0; signal: onError(java.lang.RuntimeException: Simulated error during entity creation))

	at reactor.test.MessageFormatter.assertionError(MessageFormatter.java:115)
	at reactor.test.MessageFormatter.failPrefix(MessageFormatter.java:104)
	at reactor.test.MessageFormatter.fail(MessageFormatter.java:73)
	at reactor.test.MessageFormatter.failOptional(MessageFormatter.java:88)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.checkCountMismatch(DefaultStepVerifierBuilder.java:1396)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onSignalCount(DefaultStepVerifierBuilder.java:1638)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onExpectation(DefaultStepVerifierBuilder.java:1491)
	at reactor.test.DefaultStepVerifierBuilder$DefaultVerifySubscriber.onError(DefaultStepVerifierBuilder.java:1151)
	at reactor.core.publisher.SinkManyUnicast.checkTerminated(SinkManyUnicast.java:394)
	at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:281)
	at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:371)
	at reactor.core.publisher.SinkManyUnicast.subscribe(SinkManyUnicast.java:429)
	at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:55)
	at reactor.core.publisher.Flux.subscribe(Flux.java:8888)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.toVerifierAndSubscribe(DefaultStepVerifierBuilder.java:904)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:844)
	at reactor.test.DefaultStepVerifierBuilder$DefaultStepVerifier.verify(DefaultStepVerifierBuilder.java:836)
	at org.springframework.data.elasticsearch.core.ReactiveElasticsearchIntegrationTests.shouldPropagateErrorsDuringFluxSaveOperations(ReactiveElasticsearchIntegrationTests.java:1243)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
	Suppressed: java.lang.RuntimeException: Simulated error during entity creation
		at org.springframework.data.elasticsearch.core.ReactiveElasticsearchIntegrationTests.shouldPropagateErrorsDuringFluxSaveOperations(ReactiveElasticsearchIntegrationTests.java:1228)
		... 3 more


xylos19 added 4 commits February 14, 2026 16:26
Previously, errors occurring during the saveAll operation within the reactive save method were swallowed because the inner subscriber did not have an error handler. This caused the Flux to hang indefinitely instead of terminating with an error.

This commit adds an error handler to the inner subscriber that:
1. Cancels the upstream subscription to prevent further processing.
2. Propagates the error to the sink, allowing the caller to receive the error signal.
3. Updates the map operation to return the entity for better debugging capability.

Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local>
This test verifies that errors occurring during saveAll operations
with a Flux are properly propagated to the subscriber instead of
being swallowed. The test creates a Flux that emits valid entities
followed by an error, and confirms the error reaches the caller.

Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local>
Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local>
…race condition

The manual subscriber's onError fires before in-flight saveAll can push
results through tryEmitNext, so the caller sees 0 entities before the error.
Updated test expectation and added clarifying comment.

Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local>
@noel1155
Copy link
Author

@sothawo Thanks for running the test! The original test had expectNextCount(2) which was incorrect for the current implementation. The test expected 2 entities to be emitted before the error, but the manual subscriber has a race condition: when the source flux errors after emitting 2 valid entities, onError fires and terminates the sink via tryEmitError() before the in-flight
saveAll can push its results back through tryEmitNext. So the caller sees 0 entities instead of 2. The entities are saved in Elasticsearch, but the acknowledgment back to the caller is lost. I've updated the test to expectNextCount(0) to match the current behavior. Could you please pull the latest and re-run?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

status: waiting-for-triage An issue we've not yet triaged

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Reactive saveAll errors are swallowed in AbstractReactiveElasticsearchTemplate

3 participants