20
20
import static org .hamcrest .Matchers .instanceOf ;
21
21
import static org .junit .Assert .assertFalse ;
22
22
import static org .junit .Assert .assertThat ;
23
+ import static org .junit .Assert .assertTrue ;
23
24
import static org .junit .Assert .fail ;
24
25
26
+ import java .util .concurrent .CountDownLatch ;
27
+ import java .util .concurrent .TimeUnit ;
28
+
25
29
import org .junit .After ;
26
30
import org .junit .Rule ;
27
31
import org .junit .Test ;
30
34
import org .springframework .amqp .core .Queue ;
31
35
import org .springframework .amqp .rabbit .connection .CachingConnectionFactory ;
32
36
import org .springframework .amqp .rabbit .connection .ConnectionFactory ;
37
+ import org .springframework .amqp .rabbit .connection .RabbitUtils ;
38
+ import org .springframework .amqp .rabbit .connection .ShutDownChannelListener ;
33
39
import org .springframework .amqp .rabbit .core .RabbitAdmin ;
34
40
import org .springframework .amqp .rabbit .junit .BrokerRunning ;
35
41
import org .springframework .amqp .rabbit .listener .adapter .MessageListenerAdapter ;
36
42
import org .springframework .amqp .rabbit .listener .exception .FatalListenerStartupException ;
43
+ import org .springframework .context .ApplicationContext ;
37
44
import org .springframework .context .ApplicationContextException ;
38
45
import org .springframework .context .annotation .AnnotationConfigApplicationContext ;
39
46
import org .springframework .context .annotation .Bean ;
@@ -87,9 +94,12 @@ public void testMismatchedQueue() {
87
94
@ Test
88
95
public void testMismatchedQueueDuringRestart () throws Exception {
89
96
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext (Config2 .class );
97
+ CountDownLatch [] latches = setUpChannelLatches (context );
90
98
RabbitAdmin admin = context .getBean (RabbitAdmin .class );
91
99
admin .deleteQueue (TEST_MISMATCH );
100
+ assertTrue (latches [0 ].await (20 , TimeUnit .SECONDS ));
92
101
admin .declareQueue (new Queue (TEST_MISMATCH , false , false , true ));
102
+ assertTrue (latches [1 ].await (20 , TimeUnit .SECONDS ));
93
103
SimpleMessageListenerContainer container = context .getBean (SimpleMessageListenerContainer .class );
94
104
int n = 0 ;
95
105
while (n ++ < 200 && container .isRunning ()) {
@@ -102,9 +112,12 @@ public void testMismatchedQueueDuringRestart() throws Exception {
102
112
@ Test
103
113
public void testMismatchedQueueDuringRestartMultiQueue () throws Exception {
104
114
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext (Config3 .class );
115
+ CountDownLatch [] latches = setUpChannelLatches (context );
105
116
RabbitAdmin admin = context .getBean (RabbitAdmin .class );
106
117
admin .deleteQueue (TEST_MISMATCH );
118
+ assertTrue (latches [0 ].await (20 , TimeUnit .SECONDS ));
107
119
admin .declareQueue (new Queue (TEST_MISMATCH , false , false , true ));
120
+ assertTrue (latches [1 ].await (20 , TimeUnit .SECONDS ));
108
121
SimpleMessageListenerContainer container = context .getBean (SimpleMessageListenerContainer .class );
109
122
int n = 0 ;
110
123
while (n ++ < 200 && container .isRunning ()) {
@@ -114,6 +127,21 @@ public void testMismatchedQueueDuringRestartMultiQueue() throws Exception {
114
127
context .close ();
115
128
}
116
129
130
+ private CountDownLatch [] setUpChannelLatches (ApplicationContext context ) {
131
+ CachingConnectionFactory cf = context .getBean (CachingConnectionFactory .class );
132
+ final CountDownLatch cancelLatch = new CountDownLatch (1 );
133
+ final CountDownLatch mismatchLatch = new CountDownLatch (1 );
134
+ cf .addChannelListener ((ShutDownChannelListener ) s -> {
135
+ if (RabbitUtils .isNormalChannelClose (s )) {
136
+ cancelLatch .countDown ();
137
+ }
138
+ else if (RabbitUtils .isMismatchedQueueArgs (s )) {
139
+ mismatchLatch .countDown ();
140
+ }
141
+ });
142
+ return new CountDownLatch [] { cancelLatch , mismatchLatch };
143
+ }
144
+
117
145
@ Configuration
118
146
static class Config0 {
119
147
0 commit comments