14
14
*/
15
15
package com .amazon .sqs .javamessaging ;
16
16
17
+ import java .net .URI ;
17
18
import java .util .ArrayDeque ;
18
19
import java .util .ArrayList ;
19
20
import java .util .Iterator ;
20
21
import java .util .List ;
21
22
import java .util .Set ;
22
23
import java .util .UUID ;
23
24
25
+ import javax .jms .Destination ;
24
26
import javax .jms .JMSException ;
25
27
import javax .jms .MessageListener ;
26
28
@@ -92,7 +94,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
92
94
* Counter on how many messages are prefetched into internal messageQueue.
93
95
*/
94
96
protected int messagesPrefetched = 0 ;
95
-
97
+
98
+ /**
99
+ * Counter on how many messages have been explicitly requested.
100
+ * TODO: Consider renaming this class and several other variables now that
101
+ * this logic factors in message requests as well as prefetching.
102
+ */
103
+ protected int messagesRequested = 0 ;
104
+
96
105
/**
97
106
* States of the prefetch thread
98
107
*/
@@ -163,9 +172,24 @@ protected void setMessageListener(MessageListener messageListener) {
163
172
List <MessageManager > allPrefetchedMessages = new ArrayList <MessageManager >(messageQueue );
164
173
sqsSessionRunnable .scheduleCallBacks (messageListener , allPrefetchedMessages );
165
174
messageQueue .clear ();
175
+
176
+ // This will request the first message if necessary.
177
+ // TODO: This may overfetch if setMessageListener is being called multiple
178
+ // times, as the session callback scheduler may already have entries for this consumer.
179
+ messageListenerReady ();
166
180
}
167
181
}
168
182
183
+ /**
184
+ * Determine the number of messages we should attempt to fetch from SQS.
185
+ * Returns the difference between the number of messages needed (either for
186
+ * prefetching or by request) and the number currently fetched.
187
+ */
188
+ private int numberOfMessagesToFetch () {
189
+ int numberOfMessagesNeeded = Math .max (numberOfMessagesToPrefetch , messagesRequested );
190
+ return Math .max (numberOfMessagesNeeded - messagesPrefetched , 0 );
191
+ }
192
+
169
193
/**
170
194
* Runs until the message consumer is closed and in-progress SQS
171
195
* <code>receiveMessage</code> call returns.
@@ -190,8 +214,7 @@ public void run() {
190
214
synchronized (stateLock ) {
191
215
waitForStart ();
192
216
waitForPrefetch ();
193
- prefetchBatchSize = Math .min (
194
- (numberOfMessagesToPrefetch - messagesPrefetched ), SQSMessagingClientConstants .MAX_BATCH );
217
+ prefetchBatchSize = Math .min (numberOfMessagesToFetch (), SQSMessagingClientConstants .MAX_BATCH );
195
218
}
196
219
197
220
if (!isClosed ()) {
@@ -290,7 +313,7 @@ protected void processReceivedMessages(List<Message> messages) {
290
313
291
314
protected void waitForPrefetch () throws InterruptedException {
292
315
synchronized (stateLock ) {
293
- while (messagesPrefetched >= numberOfMessagesToPrefetch && !isClosed ()) {
316
+ while (numberOfMessagesToFetch () <= 0 && !isClosed ()) {
294
317
try {
295
318
stateLock .wait ();
296
319
} catch (InterruptedException e ) {
@@ -332,7 +355,20 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
332
355
throw new JMSException ("Not a supported JMS message type" );
333
356
}
334
357
}
358
+
335
359
jmsMessage .setJMSDestination (sqsDestination );
360
+
361
+ MessageAttributeValue replyToQueueNameAttribute = message .getMessageAttributes ().get (
362
+ SQSMessage .JMS_SQS_REPLY_TO_QUEUE_NAME );
363
+ MessageAttributeValue replyToQueueUrlAttribute = message .getMessageAttributes ().get (
364
+ SQSMessage .JMS_SQS_REPLY_TO_QUEUE_URL );
365
+ if (replyToQueueNameAttribute != null && replyToQueueUrlAttribute != null ) {
366
+ String replyToQueueUrl = replyToQueueUrlAttribute .getStringValue ();
367
+ String replyToQueueName = replyToQueueNameAttribute .getStringValue ();
368
+ Destination replyToQueue = new SQSQueueDestination (replyToQueueName , replyToQueueUrl );
369
+ jmsMessage .setJMSReplyTo (replyToQueue );
370
+ }
371
+
336
372
return jmsMessage ;
337
373
}
338
374
@@ -366,12 +402,38 @@ protected void waitForStart() throws InterruptedException {
366
402
public void messageDispatched () {
367
403
synchronized (stateLock ) {
368
404
messagesPrefetched --;
369
- if (messagesPrefetched < numberOfMessagesToPrefetch ) {
405
+ messagesRequested --;
406
+ if (numberOfMessagesToFetch () > 0 ) {
370
407
notifyStateChange ();
371
408
}
372
409
}
373
410
}
374
411
412
+ @ Override
413
+ public void messageListenerReady () {
414
+ synchronized (stateLock ) {
415
+ // messagesRequested may still be more than zero if there were pending receive()
416
+ // calls when the message listener was set.
417
+ if (messagesRequested <= 0 && !isClosed () && messageListener != null ) {
418
+ requestMessage ();
419
+ }
420
+ }
421
+ }
422
+
423
+ void requestMessage () {
424
+ synchronized (stateLock ) {
425
+ messagesRequested ++;
426
+ notifyStateChange ();
427
+ }
428
+ }
429
+
430
+ private void unrequestMessage () {
431
+ synchronized (stateLock ) {
432
+ messagesRequested --;
433
+ notifyStateChange ();
434
+ }
435
+ }
436
+
375
437
public static class MessageManager {
376
438
377
439
private final PrefetchManager prefetchManager ;
@@ -405,28 +467,35 @@ javax.jms.Message receive(long timeout) throws JMSException {
405
467
timeout = 0 ;
406
468
}
407
469
408
- MessageManager messageManager ;
470
+ MessageManager messageManager = null ;
409
471
synchronized (stateLock ) {
410
472
// If message exists in queue poll.
411
473
if (!messageQueue .isEmpty ()) {
412
474
messageManager = messageQueue .pollFirst ();
413
475
} else {
414
- long startTime = System .currentTimeMillis ();
415
-
416
- long waitTime = 0 ;
417
- while (messageQueue .isEmpty () && !isClosed () &&
418
- (timeout == 0 || (waitTime = getWaitTime (timeout , startTime )) > 0 )) {
419
- try {
420
- stateLock .wait (waitTime );
421
- } catch (InterruptedException e ) {
422
- Thread .currentThread ().interrupt ();
476
+ requestMessage ();
477
+ try {
478
+ long startTime = System .currentTimeMillis ();
479
+
480
+ long waitTime = 0 ;
481
+ while (messageQueue .isEmpty () && !isClosed () &&
482
+ (timeout == 0 || (waitTime = getWaitTime (timeout , startTime )) > 0 )) {
483
+ try {
484
+ stateLock .wait (waitTime );
485
+ } catch (InterruptedException e ) {
486
+ Thread .currentThread ().interrupt ();
487
+ return null ;
488
+ }
489
+ }
490
+ if (messageQueue .isEmpty () || isClosed ()) {
423
491
return null ;
424
492
}
425
- }
426
- if (messageQueue .isEmpty () || isClosed ()) {
427
- return null ;
428
- }
429
- messageManager = messageQueue .pollFirst ();
493
+ messageManager = messageQueue .pollFirst ();
494
+ } finally {
495
+ if (messageManager == null ) {
496
+ unrequestMessage ();
497
+ }
498
+ }
430
499
}
431
500
}
432
501
return messageHandler (messageManager );
0 commit comments