Skip to content

Commit 31c728a

Browse files
mp911dechristophstrobl
authored andcommitted
Replace poll and assert with await for Pub/Sub tests.
We now await until a message has arrived in our pub/sub tests. Previously, we polled the message listener which could lead to false positives by delayed messages. We also exclude raw (byte-array) cases as byte arrays cannot be easily checked for equality without further instrumentation. Closes: #1834 Original Pull Request: #1978
1 parent 4faf12d commit 31c728a

File tree

3 files changed

+15
-62
lines changed

3 files changed

+15
-62
lines changed

src/test/java/org/springframework/data/redis/listener/PubSubResubscribeTests.java

+10-47
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,17 @@
1515
*/
1616
package org.springframework.data.redis.listener;
1717

18-
import static org.assertj.core.api.Assertions.*;
18+
import static org.awaitility.Awaitility.*;
1919
import static org.junit.Assume.*;
2020

21+
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.Arrays;
2324
import java.util.Collection;
24-
import java.util.HashSet;
25-
import java.util.LinkedHashSet;
2625
import java.util.List;
27-
import java.util.Set;
2826
import java.util.UUID;
2927
import java.util.concurrent.BlockingDeque;
3028
import java.util.concurrent.LinkedBlockingDeque;
31-
import java.util.concurrent.TimeUnit;
3229
import java.util.stream.Collectors;
3330

3431
import org.junit.jupiter.api.AfterEach;
@@ -156,18 +153,7 @@ void testContainerPatternResubscribe() throws Exception {
156153
template.convertAndSend(CHANNEL, payload1);
157154
template.convertAndSend(ANOTHER_CHANNEL, payload2);
158155

159-
// anotherListener receives both messages
160-
List<String> msgs = new ArrayList<>();
161-
msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS));
162-
msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS));
163-
164-
assertThat(msgs.size()).isEqualTo(2);
165-
assertThat(msgs).contains(payload1);
166-
assertThat(msgs).contains(payload2);
167-
msgs.clear();
168-
169-
// unsubscribed adapter did not receive message
170-
assertThat(bag.poll(500, TimeUnit.MILLISECONDS)).isNull();
156+
await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2));
171157

172158
// bind original listener on another channel
173159
container.addMessageListener(adapter, new ChannelTopic(ANOTHER_CHANNEL));
@@ -178,21 +164,10 @@ void testContainerPatternResubscribe() throws Exception {
178164
template.convertAndSend(CHANNEL, payload1);
179165
template.convertAndSend(ANOTHER_CHANNEL, payload2);
180166

181-
// original listener received only one message on another channel
182-
msgs.clear();
183-
msgs.add(bag.poll(500, TimeUnit.MILLISECONDS));
184-
msgs.add(bag.poll(500, TimeUnit.MILLISECONDS));
185-
186-
assertThat(msgs).contains(payload2);
187-
assertThat(msgs.contains(null)).isTrue();
167+
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload2));
188168

189169
// another listener receives messages on both channels
190-
msgs.clear();
191-
msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS));
192-
msgs.add(bag2.poll(500, TimeUnit.MILLISECONDS));
193-
assertThat(msgs.size()).isEqualTo(2);
194-
assertThat(msgs).contains(payload1);
195-
assertThat(msgs).contains(payload2);
170+
await().atMost(Duration.ofSeconds(2)).until(() -> bag2.contains(payload1) && bag2.contains(payload2));
196171
}
197172

198173
@ParameterizedRedisTest
@@ -222,15 +197,7 @@ void testContainerChannelResubscribe() throws Exception {
222197
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload1);
223198
template.convertAndSend(ANOTHER_CHANNEL, anotherPayload2);
224199

225-
Set<String> set = new LinkedHashSet<>();
226-
set.add(bag.poll(500, TimeUnit.MILLISECONDS));
227-
set.add(bag.poll(500, TimeUnit.MILLISECONDS));
228-
229-
assertThat(set.contains(payload1)).isFalse();
230-
assertThat(set.contains(payload2)).isFalse();
231-
232-
assertThat(set.contains(anotherPayload1)).isTrue();
233-
assertThat(set.contains(anotherPayload2)).isTrue();
200+
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(anotherPayload1) && bag.contains(anotherPayload2));
234201
}
235202

236203
/**
@@ -246,8 +213,8 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio
246213

247214
container.stop();
248215

249-
String uniqueChannel = "random-" + UUID.randomUUID().toString();
250-
PubSubAwaitUtil.runAndAwaitChannelSubscription(template.getConnectionFactory(), uniqueChannel, () -> {
216+
String uniqueChannel = "random-" + UUID.randomUUID();
217+
PubSubAwaitUtil.runAndAwaitPatternSubscription(template.getConnectionFactory(), () -> {
251218

252219
container.addMessageListener(adapter,
253220
Arrays.asList(new Topic[] { new ChannelTopic(uniqueChannel), new PatternTopic("s*") }));
@@ -256,16 +223,12 @@ void testInitializeContainerWithMultipleTopicsIncludingPattern() throws Exceptio
256223

257224
// timing: There's currently no other way to synchronize
258225
// than to hope the subscribe/unsubscribe are executed within the time.
259-
Thread.sleep(50);
226+
Thread.sleep(250);
260227

261228
template.convertAndSend("somechannel", "HELLO");
262229
template.convertAndSend(uniqueChannel, "WORLD");
263230

264-
Set<String> set = new LinkedHashSet<>();
265-
set.add(bag.poll(500, TimeUnit.MILLISECONDS));
266-
set.add(bag.poll(500, TimeUnit.MILLISECONDS));
267-
268-
assertThat(set).isEqualTo(new HashSet<>(Arrays.asList(new String[] { "HELLO", "WORLD" })));
231+
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains("HELLO") && bag.contains("WORLD"));
269232
}
270233

271234
private class MessageHandler {

src/test/java/org/springframework/data/redis/listener/PubSubTestParams.java

-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.springframework.data.redis.ObjectFactory;
2222
import org.springframework.data.redis.Person;
2323
import org.springframework.data.redis.PersonObjectFactory;
24-
import org.springframework.data.redis.RawObjectFactory;
2524
import org.springframework.data.redis.StringObjectFactory;
2625
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
2726
import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension;
@@ -44,7 +43,6 @@ public static Collection<Object[]> testParams() {
4443
// create Jedis Factory
4544
ObjectFactory<String> stringFactory = new StringObjectFactory();
4645
ObjectFactory<Person> personFactory = new PersonObjectFactory();
47-
ObjectFactory<byte[]> rawFactory = new RawObjectFactory();
4846

4947
JedisConnectionFactory jedisConnFactory = JedisConnectionFactoryExtension
5048
.getNewConnectionFactory(RedisStanalone.class);
@@ -78,7 +76,6 @@ public static Collection<Object[]> testParams() {
7876
parameters.add(new Object[] { personFactory, personTemplate });
7977
parameters.add(new Object[] { stringFactory, stringTemplateLtc });
8078
parameters.add(new Object[] { personFactory, personTemplateLtc });
81-
parameters.add(new Object[] { rawFactory, rawTemplateLtc });
8279

8380
if (clusterAvailable()) {
8481

src/test/java/org/springframework/data/redis/listener/PubSubTests.java

+5-12
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
import static org.assertj.core.api.Assertions.*;
1919
import static org.assertj.core.api.Assumptions.*;
20+
import static org.awaitility.Awaitility.*;
2021

22+
import java.time.Duration;
2123
import java.util.Arrays;
2224
import java.util.Collection;
2325
import java.util.Collections;
24-
import java.util.LinkedHashSet;
25-
import java.util.Set;
2626
import java.util.concurrent.BlockingDeque;
2727
import java.util.concurrent.LinkedBlockingDeque;
2828
import java.util.concurrent.Phaser;
@@ -108,7 +108,7 @@ protected void doExecute(Runnable task) {
108108
container.start();
109109

110110
phaser.arriveAndAwaitAdvance();
111-
Thread.sleep(50);
111+
Thread.sleep(250);
112112
}
113113

114114
@AfterEach
@@ -134,11 +134,7 @@ void testContainerSubscribe() throws Exception {
134134
template.convertAndSend(CHANNEL, payload1);
135135
template.convertAndSend(CHANNEL, payload2);
136136

137-
Set<T> set = new LinkedHashSet<>();
138-
set.add((T) bag.poll(1, TimeUnit.SECONDS));
139-
set.add((T) bag.poll(1, TimeUnit.SECONDS));
140-
141-
assertThat(set).contains(payload1, payload2);
137+
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload1) && bag.contains(payload2));
142138
}
143139

144140
@ParameterizedRedisTest
@@ -192,10 +188,7 @@ void testStartListenersToNoSpecificChannelTest() throws InterruptedException {
192188

193189
template.convertAndSend(CHANNEL, payload);
194190

195-
Set<T> set = new LinkedHashSet<>();
196-
set.add((T) bag.poll(3, TimeUnit.SECONDS));
197-
198-
assertThat(set).contains(payload);
191+
await().atMost(Duration.ofSeconds(2)).until(() -> bag.contains(payload));
199192
}
200193

201194
private static boolean isClusterAware(RedisConnectionFactory connectionFactory) {

0 commit comments

Comments
 (0)