Skip to content

Commit f83f038

Browse files
committed
Fix Race in Test
1 parent 50ce5c2 commit f83f038

File tree

1 file changed

+7
-4
lines changed

1 file changed

+7
-4
lines changed

spring-kafka/src/test/java/org/springframework/kafka/listener/PauseContainerWhileErrorHandlerIsRetryingTests.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858

5959
/**
6060
* @author Antonio Tomac
61+
* @author Gary Russell
6162
* @since 2.9
6263
*/
6364
@SpringJUnitConfig
@@ -80,7 +81,7 @@ public void provokeRetriesTriggerPauseThenResume() throws InterruptedException {
8081
await("for first 2 records")
8182
.atMost(Duration.ofSeconds(10))
8283
.untilAsserted(() -> assertThat(setup.received).as("received").contains("1", "2"));
83-
assertThat(setup.processed).as("processed").contains("1", "2");
84+
await().untilAsserted(() -> assertThat(setup.processed).as("processed").contains("1", "2"));
8485

8586
setup.triggerPause.set(true);
8687
log("enable listener throwing");
@@ -92,7 +93,7 @@ public void provokeRetriesTriggerPauseThenResume() throws InterruptedException {
9293
.untilAsserted(() -> assertThat(setup.received)
9394
.as("received")
9495
.hasSizeGreaterThan(2));
95-
assertThat(setup.processed).as("processed").hasSize(2);
96+
await().untilAsserted(() -> assertThat(setup.processed).as("processed").hasSize(2));
9697

9798
setup.triggerPause.set(false);
9899
setup.resumeContainer();
@@ -106,9 +107,9 @@ public void provokeRetriesTriggerPauseThenResume() throws InterruptedException {
106107
.untilAsserted(() -> assertThat(setup.received)
107108
.as("received - all")
108109
.contains("1", "2", "3", "4", "5", "6", "7", "8", "9"));
109-
assertThat(setup.processed)
110+
await().untilAsserted(() -> assertThat(setup.processed)
110111
.as("processed all - not loosing 3, 4, 5")
111-
.contains("1", "2", "3", "4", "5", "6", "7", "8", "9");
112+
.contains("1", "2", "3", "4", "5", "6", "7", "8", "9"));
112113
}
113114

114115
@Configuration
@@ -122,9 +123,11 @@ public static class Config {
122123
EmbeddedKafkaBroker embeddedKafkaBroker;
123124

124125
final Set<String> received = new LinkedHashSet<>();
126+
125127
final Set<String> processed = new LinkedHashSet<>();
126128

127129
final AtomicBoolean failing = new AtomicBoolean(false);
130+
128131
final AtomicBoolean triggerPause = new AtomicBoolean(false);
129132

130133
void resumeContainer() {

0 commit comments

Comments
 (0)