Skip to content

Commit 4858537

Browse files
committed
Add checkClosed() to all three receive*() methods
As per awslabs#77, closed consumers should only return null from concurrent in-flight receive calls, not future ones, and returning null doesn't play nice with SpringJMS. Checking in SQSMessageConsumer is more consistent than in SQSMessageConsumerPrefetch, and easier to test to boot.
1 parent 04b2881 commit 4858537

File tree

3 files changed

+89
-4
lines changed

3 files changed

+89
-4
lines changed

src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,13 @@ public void setMessageListener(MessageListener listener) throws JMSException {
140140
* paused.
141141
*
142142
* @return the next message produced for this message consumer, or null if
143-
* this message consumer is closed
143+
* this message consumer is closed during the receive call
144144
* @throws JMSException
145145
* On internal error
146146
*/
147147
@Override
148148
public Message receive() throws JMSException {
149+
checkClosed();
149150
return sqsMessageConsumerPrefetch.receive();
150151
}
151152

@@ -157,12 +158,13 @@ public Message receive() throws JMSException {
157158
* @param timeout
158159
* the timeout value (in milliseconds)
159160
* @return the next message produced for this message consumer, or null if
160-
* the timeout expires or this message consumer is closed
161+
* the timeout expires or this message consumer is closed during the receive call
161162
* @throws JMSException
162163
* On internal error
163164
*/
164165
@Override
165166
public Message receive(long timeout) throws JMSException {
167+
checkClosed();
166168
return sqsMessageConsumerPrefetch.receive(timeout);
167169
}
168170

@@ -176,6 +178,7 @@ public Message receive(long timeout) throws JMSException {
176178
*/
177179
@Override
178180
public Message receiveNoWait() throws JMSException {
181+
checkClosed();
179182
return sqsMessageConsumerPrefetch.receiveNoWait();
180183
}
181184

src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -590,11 +590,11 @@ private javax.jms.Message messageHandler(MessageManager messageManager) throws J
590590

591591
private boolean cannotDeliver() throws JMSException {
592592
if (!running) {
593-
return true;
593+
return true;
594594
}
595595

596596
if (isClosed()) {
597-
throw new JMSException("Cannot receive messages when the consumer is closed");
597+
throw new JMSException("Cannot receive messages when the consumer is closed");
598598
}
599599

600600
if (messageListener != null) {

src/test/java/com/amazon/sqs/javamessaging/SQSMessageConsumerTest.java

+82
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,88 @@ public void testSetMessageListenerAlreadyClosed() throws InterruptedException, J
347347
}
348348
}
349349

350+
/**
351+
* Test receive fails when consumer is already closed
352+
*/
353+
@Test
354+
public void testReceiveAlreadyClosed() throws InterruptedException, JMSException {
355+
356+
/*
357+
* Set up consumer
358+
*/
359+
consumer = spy(new SQSMessageConsumer(sqsConnection, sqsSession, sqsSessionRunnable,
360+
destination, acknowledger, negativeAcknowledger, threadFactory, sqsMessageConsumerPrefetch));
361+
362+
consumer.close();
363+
364+
/*
365+
* Call receive
366+
*/
367+
try {
368+
consumer.receive();
369+
fail();
370+
} catch (JMSException ex) {
371+
assertEquals("Consumer is closed", ex.getMessage());
372+
}
373+
374+
375+
}
376+
377+
/**
378+
* Test set message listener fails when consumer is already closed
379+
*/
380+
@Test
381+
public void testReceiveWithTimeoutAlreadyClosed() throws InterruptedException, JMSException {
382+
383+
/*
384+
* Set up consumer
385+
*/
386+
consumer = spy(new SQSMessageConsumer(sqsConnection, sqsSession, sqsSessionRunnable,
387+
destination, acknowledger, negativeAcknowledger, threadFactory, sqsMessageConsumerPrefetch));
388+
389+
consumer.close();
390+
391+
long timeout = 10;
392+
393+
/*
394+
* Call receive with timeout
395+
*/
396+
try {
397+
consumer.receive(timeout);
398+
fail();
399+
} catch (JMSException ex) {
400+
assertEquals("Consumer is closed", ex.getMessage());
401+
}
402+
403+
404+
}
405+
406+
/**
407+
* Test set message listener fails when consumer is already closed
408+
*/
409+
@Test
410+
public void testReceiveNoWaitAlreadyClosed() throws InterruptedException, JMSException {
411+
412+
/*
413+
* Set up consumer
414+
*/
415+
consumer = spy(new SQSMessageConsumer(sqsConnection, sqsSession, sqsSessionRunnable,
416+
destination, acknowledger, negativeAcknowledger, threadFactory, sqsMessageConsumerPrefetch));
417+
418+
consumer.close();
419+
420+
/*
421+
* Call receive no wait
422+
*/
423+
try {
424+
425+
consumer.receiveNoWait();
426+
fail();
427+
} catch (JMSException ex) {
428+
assertEquals("Consumer is closed", ex.getMessage());
429+
}
430+
}
431+
350432
/**
351433
* Test set message listener
352434
*/

0 commit comments

Comments
 (0)