Skip to content

fix #81 add async send methods to SQSMessageProducer #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/main/java/com/amazon/sqs/javamessaging/CompletionListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.amazon.sqs.javamessaging;

import javax.jms.Message;

/**
* Mimics {@link javax.jms.CompletionListener} from JMS 2.0 API
*/
public interface CompletionListener {

/**
* Notifies the application that the message has been successfully sent
*
* @param message
* the message that was sent.
*/
void onCompletion(Message message);

/**
* Notifies user that the specified exception was thrown while attempting to
* send the specified message. If an exception occurs it is undefined
* whether or not the message was successfully sent.
*
* @param message
* the message that was sent.
* @param exception
* the exception
*
*/
void onException(Message message, Exception exception);
}
142 changes: 118 additions & 24 deletions src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand All @@ -30,6 +31,8 @@
import javax.jms.Queue;
import javax.jms.QueueSender;

import com.amazonaws.handlers.AsyncHandler;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -90,13 +93,75 @@ public class SQSMessageProducer implements MessageProducer, QueueSender {

void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSException {
checkClosed();
String sqsMessageBody = null;
String messageType = null;
if (!(rawMessage instanceof SQSMessage)) {
throw new MessageFormatException(
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
}
SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage);
SQSMessage message = (SQSMessage) rawMessage;

SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest);
String messageId = sendMessageResult.getMessageId();
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
applySendMessageResult(sendMessageResult, message);
}

private void sendInternalAsync(SQSQueueDestination queue, Message rawMessage, final CompletionListener completionListener) throws JMSException {
checkClosed();
if (!(amazonSQSClient.getAmazonSQSClient() instanceof AmazonSQSAsync)) {
throw new UnsupportedOperationException("Expected instance of " + SQSMessageProducer.class.getName() + " to be backed by an instance of " +
AmazonSQSAsync.class.getName() +
" but was: " + amazonSQSClient.getAmazonSQSClient().getClass().getName());
}
if (!(rawMessage instanceof SQSMessage)) {
throw new MessageFormatException(
"Unrecognized message type. Messages have to be one of: SQSBytesMessage, SQSObjectMessage, or SQSTextMessage");
}
AmazonSQSAsync amazonSQSAsync = (AmazonSQSAsync) amazonSQSClient.getAmazonSQSClient();

SendMessageRequest sendMessageRequest = createSendMessageRequest(queue, rawMessage);
final SQSMessage message = (SQSMessage) rawMessage;

amazonSQSAsync.sendMessageAsync(sendMessageRequest, new AsyncHandler<SendMessageRequest, SendMessageResult>() {
@Override
public void onError(Exception e) {
if (completionListener != null) {
completionListener.onException(message, e);
}
}

@Override
public void onSuccess(SendMessageRequest request, SendMessageResult sendMessageResult) {
String messageId = sendMessageResult.getMessageId();
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
try {
applySendMessageResult(sendMessageResult, message);
} catch (JMSException e) {
throw new RuntimeException(e);
}
if (completionListener != null) {
completionListener.onCompletion(message);
}
}
});
}

private void applySendMessageResult(SendMessageResult sendMessageResult, SQSMessage message) throws JMSException {
/** TODO: Do not support disableMessageID for now. */
message.setSQSMessageId(sendMessageResult.getMessageId());

// if the message was sent to FIFO queue, the sequence number will be
// set in the response
// pass it to JMS user through provider specific JMS property
if (sendMessageResult.getSequenceNumber() != null) {
message.setSequenceNumber(sendMessageResult.getSequenceNumber());
}

}

private SendMessageRequest createSendMessageRequest(SQSQueueDestination queue, Message rawMessage) throws JMSException {
String sqsMessageBody = null;
String messageType = null;

SQSMessage message = (SQSMessage)rawMessage;
message.setJMSDestination(queue);
if (message instanceof SQSBytesMessage) {
Expand All @@ -105,11 +170,11 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
} else if (message instanceof SQSObjectMessage) {
sqsMessageBody = ((SQSObjectMessage) message).getMessageBody();
messageType = SQSMessage.OBJECT_MESSAGE_TYPE;
} else if (message instanceof SQSTextMessage) {
} else if (message instanceof SQSTextMessage) {
sqsMessageBody = ((SQSTextMessage) message).getText();
messageType = SQSMessage.TEXT_MESSAGE_TYPE;
}

if (sqsMessageBody == null || sqsMessageBody.isEmpty()) {
throw new JMSException("Message body cannot be null or empty");
}
Expand Down Expand Up @@ -139,19 +204,7 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
sendMessageRequest.setMessageGroupId(message.getSQSMessageGroupId());
sendMessageRequest.setMessageDeduplicationId(message.getSQSMessageDeduplicationId());
}

SendMessageResult sendMessageResult = amazonSQSClient.sendMessage(sendMessageRequest);
String messageId = sendMessageResult.getMessageId();
LOG.info("Message sent to SQS with SQS-assigned messageId: " + messageId);
/** TODO: Do not support disableMessageID for now. */
message.setSQSMessageId(messageId);

// if the message was sent to FIFO queue, the sequence number will be
// set in the response
// pass it to JMS user through provider specific JMS property
if (sendMessageResult.getSequenceNumber() != null) {
message.setSequenceNumber(sendMessageResult.getSequenceNumber());
}
return sendMessageRequest;
}

@Override
Expand Down Expand Up @@ -187,6 +240,15 @@ public void send(Queue queue, Message message) throws JMSException {
sendInternal((SQSQueueDestination)queue, message);
}

public void sendAsync(Queue queue, Message message, CompletionListener completionListener) throws JMSException {
if (!(queue instanceof SQSQueueDestination)) {
throw new InvalidDestinationException(
"Incompatible implementation of Queue. Please use SQSQueueDestination implementation.");
}
checkIfDestinationAlreadySet();
sendInternalAsync((SQSQueueDestination)queue, message, completionListener);
}

/**
* Not verified on the client side, but SQS Attribute names must be valid
* letter or digit on the basic multilingual plane in addition to allowing
Expand Down Expand Up @@ -314,7 +376,12 @@ public void send(Queue queue, Message message, int deliveryMode, int priority, l
throws JMSException {
send(queue, message);
}


public void sendAsync(Queue queue, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener)
throws JMSException {
sendAsync(queue, message, completionListener);
}

/**
* Gets the destination associated with this MessageProducer.
*
Expand Down Expand Up @@ -358,19 +425,27 @@ public void send(Message message) throws JMSException {
}
sendInternal(sqsDestination, message);
}


public void sendAsync(Message message, CompletionListener completionListener) throws JMSException {
if (sqsDestination == null) {
throw new UnsupportedOperationException(
"MessageProducer has to specify a destination at creation time.");
}
sendInternalAsync(sqsDestination, message, completionListener);
}

/**
* Sends a message to a destination created during the creation time of this
* message producer.
* <P>
* Send does not support deliveryMode, priority, and timeToLive. It will
* ignore anything in deliveryMode, priority, and timeToLive.
*
*
* @param message
* the message to send
* @param deliveryMode
* @param priority
* @param timeToLive
* @param timeToLive
* @throws MessageFormatException
* If an invalid message is specified.
* @throws UnsupportedOperationException
Expand All @@ -383,7 +458,11 @@ public void send(Message message) throws JMSException {
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
send(message);
}


public void sendAsync(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException {
sendAsync(message, completionListener);
}

/**
* Sends a message to a queue destination.
*
Expand Down Expand Up @@ -414,6 +493,17 @@ public void send(Destination destination, Message message) throws JMSException {
}
}

public void sendAsync(Destination destination, Message message, CompletionListener completionListener) throws JMSException {
if (destination == null) {
throw new InvalidDestinationException("Destination cannot be null");
}
if (destination instanceof SQSQueueDestination) {
sendAsync((Queue) destination, message, completionListener);
} else {
throw new InvalidDestinationException("Incompatible implementation of Destination. Please use SQSQueueDestination implementation.");
}
}

/**
* Sends a message to a queue destination.
* <P>
Expand Down Expand Up @@ -443,6 +533,10 @@ public void send(Destination destination, Message message, int deliveryMode, int
send(destination, message);
}

public void sendAsync(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener completionListener) throws JMSException {
sendAsync(destination, message, completionListener);
}

/** This method is not supported. */
@Override
public void setDisableMessageID(boolean value) throws JMSException {
Expand Down