Sample application (plain Java) of how to use the JMS client to send and receive messages via SAP Event Mesh.
-
Installed Java 8 → Java download
-
Installed Git → Git download
-
Installed Maven 3.x → Maven download
-
Optional: Installed Postman
-
A SAP BTP Account with
Event Mesh Service
is required.
For more detailed information and help on how to start with SAP Event Mesh please check the SAP help page.-
Optional: Installed CloudFoundry CLI → Installing the cf CLI
-
This must be also fully configured with the corresponding Cloud Foundry landscape to be able to do a
cf push
.
-
-
Created Event Mesh Service Instance
-
e.g. via cli:
cf cs enterprise-messaging default emjapi-samples-sapbtp -c
emjapi-samples-jms-p2p/config
-
The Service Descriptors can be found here
-
Remember to adjust the manifest file of the application
-
-
Create queues (e.g.
NameOfQueue
) via e.g. MM API, UI-
For queue creation with the MM API you need to provide a fully qualified queue name including the namespace
-
-
-
Installed IDE of choice (e.g. Visual Studio with installed Java language support plugin)
The point to point (p2p) sample is a small spring boot (version 2.0.6) applications which provides two different REST endpoints to send and receive messages from a specific queue.
-
Check all Prerequisites fulfilled (see section Prerequisites above)
-
Clone the repository via
git clone
-
Build the project with maven (
mvn clean install
) -
Push to Cloud Foundry via
cf push
(if the service was renamed please adopt the manifest accordingly)
-
The queue name has to be URL encoded. The samples provide a function to URL encode
-
curl -X POST -H "Content-Type: text/plain" -H "Cache-Control: no-cache" -d "https://<application-path>/encode"
(the body must contain the queue name)
-
-
Send a message with a HTTP POST via https://<application-path>/queue/<queue-name>/message (the body must contain the message)
-
curl -X POST -H "Content-Type: text/plain" -H "Cache-Control: no-cache" -d '<message>' "https://<application-path>/queue/<queue-name>/message"
-
-
Receive a message with a HTTP GET via https://<application-path>/queue/<queue-name>/message
-
Note that the this is a blocking call. If the request was aborted the consumer will still be active on the server and receive the next message.
-
curl -X GET -H "Content-Type: text/plain" -H "Cache-Control: no-cache" "https://<application-path>/queue/<queue-name>/message"
-
ServiceConnectorConfig config = null; // currently there are no configurations for the MessagingServiceFactory supported
Cloud cloud = new CloudFactory().getCloud();
// get a messaging service factory via the service connector
MessagingService messagingService = cloud.getSingletonServiceConnector(MessagingService.class, config);
Create a the MessagingServiceFactory
object with the help of the MessagingServiceFactoryCreator
and get a MessagingServiceJmsConnectionFactory
.
The Connection Factory can be configured with the MessagingServiceJmsSettings
. In case the reconnection feature is not needed and an individual
connection mechanism (e.G. through a connection cache) is used the settings can be skipped. The connection factory can be built with
messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class,settings)
.
MessagingServiceJmsSettings settings = new MessagingServiceJmsSettings(); // settings are preset with default values (see JavaDoc)
settings.setMaxReconnectAttempts(5); // use -1 for unlimited attempts
settings.setInitialReconnectDelay(3000);
settings.setReconnectDelay(3000);
MessagingServiceFactory messagingServiceFactory = MessagingServiceFactoryCreator.createFactory(messagingService);
MessagingServiceJmsConnectionFactory connectionFactory = messagingServiceFactory.createConnectionFactory(MessagingServiceJmsConnectionFactory.class, settings)
For sending messages a Connection
and a Session
is needed first. Note that those resources must be closed if they are not needed anymore. As those objects are implementing the autoclosable
interface they will be closed automatically after the try-catch-block. Now a BytesMessage
can be created. In the next steps a queue is bound to a producer. They queue must be created on the broker first (via. e.g. the UI or MM API). Note, that the prefix "queue:" is mandatory. Finally, the message can be sent to the queue.
try (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
BytesMessage byteMessage = session.createBytesMessage();
byteMessage.writeBytes(message.getBytes());
Queue queue = session.createQueue("queue:" + "<queue-name>"); // even though the JMS API is "createQueue" the queue will not be created on the message broker
MessageProducer producer = session.createProducer(queue);
producer.send(byteMessage);
} catch (JMSException e) {
LOG.error("Could not send message={}.", message, e);
}
In this example a consumer is listening to a queue. Again a Connection
and a Session
is needed. Note that those resources must be closed if they are not needed anymore. First a queue with the mandatory prefix "queue:" is bound to consumer. As the messages are sent as a ByteMassage
the message needs to be converted to e.g. a String
try (Connection connection = connectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
connection.start();
Queue queue = session.createQueue(QUEUE_PREFIX + queueName); // see comments above
MessageConsumer consumer = session.createConsumer(queue);
BytesMessage message = (BytesMessage) consumer.receive(); // Blocking call. Define a timeout or use a Message Listener
byte[] byteData = new byte[(int) message.getBodyLength()];
message.readBytes(byteData);
} catch (JMSException e) {
LOG.error("Could not receive message.", e);
}
The messaging management api (MM API) provides functionality for creating, deleting and updating queues and queue subscriptions. Further more it provides APIs to get information on queues and queue subscriptions. The MM API documentation can be found here. The MM APIs have to be enabled in the service descriptor. A description for enabling the MM API can be found here.
Queues can be created through the SAP Business Technology Platform Cockpit UI. More information regarding the creation of queues through the UI can be found here
Examples for the different service descriptors can be found here on the help site and in the config folder of this project.
This project is 'as-is' with no support, no changes being made.
You are welcome to make changes to improve it but we are not available for questions or support of any kind.
Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. This file is licensed under the SAP SAMPLE CODE LICENSE AGREEMENT, v1.0-071618 except as noted otherwise in the LICENSE file.