Skip to content

Commit da19168

Browse files
committed
fix awslabs#81 add async send methods to SQSMessageProducer
1 parent b462bdc commit da19168

File tree

2 files changed

+148
-24
lines changed

2 files changed

+148
-24
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.amazon.sqs.javamessaging;
2+
3+
import javax.jms.Message;
4+
5+
/**
6+
* Mimics {@link javax.jms.CompletionListener} from JMS 2.0 API
7+
*/
8+
public interface CompletionListener {
9+
10+
/**
11+
* Notifies the application that the message has been successfully sent
12+
*
13+
* @param message
14+
* the message that was sent.
15+
*/
16+
void onCompletion(Message message);
17+
18+
/**
19+
* Notifies user that the specified exception was thrown while attempting to
20+
* send the specified message. If an exception occurs it is undefined
21+
* whether or not the message was successfully sent.
22+
*
23+
* @param message
24+
* the message that was sent.
25+
* @param exception
26+
* the exception
27+
*
28+
*/
29+
void onException(Message message, Exception exception);
30+
}

src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java

+118-24
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Enumeration;
1818
import java.util.HashMap;
1919
import java.util.Map;
20+
import java.util.concurrent.Future;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.atomic.AtomicBoolean;
2223

@@ -30,6 +31,8 @@
3031
import javax.jms.Queue;
3132
import javax.jms.QueueSender;
3233

34+
import com.amazonaws.handlers.AsyncHandler;
35+
import com.amazonaws.services.sqs.AmazonSQSAsync;
3336
import org.apache.commons.logging.Log;
3437
import org.apache.commons.logging.LogFactory;
3538

@@ -90,13 +93,75 @@ public class SQSMessageProducer implements MessageProducer, QueueSender {
9093

9194
void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSException {
9295
checkClosed();
93-
String sqsMessageBody = null;
94-
String messageType = null;
9596
if (!(rawMessage instanceof SQSMessage)) {
9697
throw new MessageFormatException(
97-
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
98+
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
99+
}
100+
SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage);
101+
SQSMessage message = (SQSMessage) rawMessage;
102+
103+
SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest);
104+
String messageId = sendMessageResult.getMessageId();
105+
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
106+
applySendMessageResult(sendMessageResult, message);
107+
}
108+
109+
private void sendInternalAsync(SQSQueueDestination queue, Message rawMessage, final CompletionListener completionListener) throws JMSException {
110+
checkClosed();
111+
if (!(amazonSQSClient.getAmazonSQSClient() instanceof AmazonSQSAsync)) {
112+
throw new UnsupportedOperationException("Expected instance of " + SQSMessageProducer.class.getName() + " to be backed by an instance of " +
113+
AmazonSQSAsync.class.getName() +
114+
" but was: " + amazonSQSClient.getAmazonSQSClient().getClass().getName());
115+
}
116+
if (!(rawMessage instanceof SQSMessage)) {
117+
throw new MessageFormatException(
118+
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
119+
}
120+
AmazonSQSAsync amazonSQSAsync = (AmazonSQSAsync) amazonSQSClient.getAmazonSQSClient();
121+
122+
SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage);
123+
final SQSMessage message = (SQSMessage) rawMessage;
124+
125+
amazonSQSAsync.sendMessageAsync(sendMessageRequest, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
126+
@Override
127+
public void onError(Exception e) {
128+
if (completionListener != null) {
129+
completionListener.onException(message, e);
130+
}
131+
}
132+
133+
@Override
134+
public void onSuccess(SendMessageRequest request, SendMessageResult sendMessageResult) {
135+
String messageId = sendMessageResult.getMessageId();
136+
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
137+
try {
138+
applySendMessageResult(sendMessageResult, message);
139+
} catch (JMSException e) {
140+
throw new RuntimeException(e);
141+
}
142+
if (completionListener != null) {
143+
completionListener.onCompletion(message);
144+
}
145+
}
146+
});
147+
}
148+
149+
private void applySendMessageResult(SendMessageResult sendMessageResult, SQSMessage message) throws JMSException {
150+
/** TODO: Do not support disableMessageID for now. */
151+
message.setSQSMessageId(sendMessageResult.getMessageId());
152+
153+
// if the message was sent to FIFO queue, the sequence number will be
154+
// set in the response
155+
// pass it to JMS user through provider specific JMS property
156+
if (sendMessageResult.getSequenceNumber() != null) {
157+
message.setSequenceNumber(sendMessageResult.getSequenceNumber());
98158
}
99-
159+
}
160+
161+
private SendMessageRequest createSendMessageRequest(SQSQueueDestination queue, Message rawMessage) throws JMSException {
162+
String sqsMessageBody = null;
163+
String messageType = null;
164+
100165
SQSMessage message = (SQSMessage)rawMessage;
101166
message.setJMSDestination(queue);
102167
if (message instanceof SQSBytesMessage) {
@@ -105,11 +170,11 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
105170
} else if (message instanceof SQSObjectMessage) {
106171
sqsMessageBody = ((SQSObjectMessage) message).getMessageBody();
107172
messageType = SQSMessage.OBJECT_MESSAGE_TYPE;
108-
} else if (message instanceof SQSTextMessage) {
173+
} else if (message instanceof SQSTextMessage) {
109174
sqsMessageBody = ((SQSTextMessage) message).getText();
110175
messageType = SQSMessage.TEXT_MESSAGE_TYPE;
111176
}
112-
177+
113178
if (sqsMessageBody == null || sqsMessageBody.isEmpty()) {
114179
throw new JMSException("Message body cannot be null or empty");
115180
}
@@ -139,19 +204,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
139204
sendMessageRequest.setMessageGroupId(message.getSQSMessageGroupId());
140205
sendMessageRequest.setMessageDeduplicationId(message.getSQSMessageDeduplicationId());
141206
}
142-
143-
SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest);
144-
String messageId = sendMessageResult.getMessageId();
145-
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
146-
/** TODO: Do not support disableMessageID for now. */
147-
message.setSQSMessageId(messageId);
148-
149-
// if the message was sent to FIFO queue, the sequence number will be
150-
// set in the response
151-
// pass it to JMS user through provider specific JMS property
152-
if (sendMessageResult.getSequenceNumber() != null) {
153-
message.setSequenceNumber(sendMessageResult.getSequenceNumber());
154-
}
207+
return sendMessageRequest;
155208
}
156209

157210
@Override
@@ -187,6 +240,15 @@ public void send(Queue queue, Message message) throws JMSException {
187240
sendInternal((SQSQueueDestination)queue, message);
188241
}
189242

243+
public void sendAsync(Queue queue, Message message, CompletionListener completionListener) throws JMSException {
244+
if (!(queue instanceof SQSQueueDestination)) {
245+
throw new InvalidDestinationException(
246+
"Incompatible implementation of Queue. Please use SQSQueueDestination implementation.");
247+
}
248+
checkIfDestinationAlreadySet();
249+
sendInternalAsync((SQSQueueDestination)queue, message, completionListener);
250+
}
251+
190252
/**
191253
* Not verified on the client side, but SQS Attribute names must be valid
192254
* letter or digit on the basic multilingual plane in addition to allowing
@@ -314,7 +376,12 @@ public void send(Queue queue, Message message, int deliveryMode, int priority, l
314376
throws JMSException {
315377
send(queue, message);
316378
}
317-
379+
380+
public void sendAsync(Queue queue, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener)
381+
throws JMSException {
382+
sendAsync(queue, message, completionListener);
383+
}
384+
318385
/**
319386
* Gets the destination associated with this MessageProducer.
320387
*
@@ -358,19 +425,27 @@ public void send(Message message) throws JMSException {
358425
}
359426
sendInternal(sqsDestination, message);
360427
}
361-
428+
429+
public void sendAsync(Message message, CompletionListener completionListener) throws JMSException {
430+
if (sqsDestination == null) {
431+
throw new UnsupportedOperationException(
432+
"MessageProducer has to specify a destination at creation time.");
433+
}
434+
sendInternalAsync(sqsDestination, message, completionListener);
435+
}
436+
362437
/**
363438
* Sends a message to a destination created during the creation time of this
364439
* message producer.
365440
* <P>
366441
* Send does not support deliveryMode, priority, and timeToLive. It will
367442
* ignore anything in deliveryMode, priority, and timeToLive.
368-
*
443+
*
369444
* @param message
370445
* the message to send
371446
* @param deliveryMode
372447
* @param priority
373-
* @param timeToLive
448+
* @param timeToLive
374449
* @throws MessageFormatException
375450
* If an invalid message is specified.
376451
* @throws UnsupportedOperationException
@@ -383,7 +458,11 @@ public void send(Message message) throws JMSException {
383458
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
384459
send(message);
385460
}
386-
461+
462+
public void sendAsync(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException {
463+
sendAsync(message, completionListener);
464+
}
465+
387466
/**
388467
* Sends a message to a queue destination.
389468
*
@@ -414,6 +493,17 @@ public void send(Destination destination, Message message) throws JMSException {
414493
}
415494
}
416495

496+
public void sendAsync(Destination destination, Message message, CompletionListener completionListener) throws JMSException {
497+
if (destination == null) {
498+
throw new InvalidDestinationException("Destination cannot be null");
499+
}
500+
if (destination instanceof SQSQueueDestination) {
501+
sendAsync((Queue) destination, message, completionListener);
502+
} else {
503+
throw new InvalidDestinationException("Incompatible implementation of Destination. Please use SQSQueueDestination implementation.");
504+
}
505+
}
506+
417507
/**
418508
* Sends a message to a queue destination.
419509
* <P>
@@ -443,6 +533,10 @@ public void send(Destination destination, Message message, int deliveryMode, int
443533
send(destination, message);
444534
}
445535

536+
public void sendAsync(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException {
537+
sendAsync(destination, message, completionListener);
538+
}
539+
446540
/** This method is not supported. */
447541
@Override
448542
public void setDisableMessageID(boolean value) throws JMSException {

0 commit comments

Comments
 (0)