Skip to content

Commit 74e4674

Browse files
Observability Support for SQS
Resolves awspring#1367
1 parent 2439fe9 commit 74e4674

File tree

44 files changed

+4363
-55
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+4363
-55
lines changed

docs/src/main/asciidoc/sqs.adoc

+169
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ Such attributes are available as `MessageHeaders` in received messages.
219219
#AUTO
220220
|Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to.
221221
With `ContentBasedDeduplication#AUTO`, the queue attribute value will be resolved automatically.
222+
223+
|`observationRegistry`
224+
|ObservationRegistry
225+
|ObservationRegistry.NOOP
226+
|Set the `ObservationRegistry` to be used for recording observations during message sending and receiving operations.
227+
See <<Observability Support>> for more information.
228+
229+
|`observationConvention`
230+
|ObservationConvention
231+
|null
232+
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
233+
See <<Observability Support>> for more information.
222234
|===
223235

224236
[[sqs-send-message]]
@@ -817,6 +829,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
817829
| <<maxMessagesPerPoll, `spring.cloud.aws.sqs.listener.max-messages-per-poll`>> | Maximum number of messages to be received per poll. | No | 10
818830
| <<pollTimeout, `spring.cloud.aws.sqs.listener.poll-timeout`>> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds
819831
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
832+
| `spring.cloud.aws.sqs.observation-enabled` | Enables observability support for SQS operations. | No | false
820833
|===
821834

822835

@@ -974,6 +987,18 @@ For `FIFO` queues, visibility is extended for all messages in a message group be
974987
See <<FIFO Support>>.
975988
Otherwise, visibility is specified once when polling SQS.
976989

990+
|`observationRegistry`
991+
|`ObservationRegistry`
992+
|`ObservationRegistry.NOOP`
993+
|Sets the `ObservationRegistry` to be used for recording observations during message processing.
994+
See <<Observability Support>>.
995+
996+
|`observationConvention`
997+
|`ObservationConvention`
998+
|`null`
999+
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
1000+
See <<Observability Support>>.
1001+
9771002
|`queueNotFoundStrategy`
9781003
|`FAIL`, `CREATE`
9791004
|`CREATE`
@@ -1724,6 +1749,150 @@ There can be multiple `SqsAsyncClientCustomizer` beans present in single applica
17241749
Note that `SqsAsyncClientCustomizer` beans are applied **after** `AwsAsyncClientCustomizer` beans and therefore can overwrite previously set configurations.
17251750

17261751

1752+
=== Observability Support
1753+
1754+
Spring Cloud AWS SQS supports observability through Micrometer's Observation API. This integration provides the ability to monitor and trace SQS operations throughout your application.
1755+
1756+
==== Enabling Observability
1757+
1758+
Observability can be enabled in a Spring Boot application by:
1759+
1760+
1. Setting the `spring.cloud.aws.sqs.observation-enabled` property to `true`
1761+
2. Having a `ObservationRegistry` bean in your application context
1762+
1763+
When using direct SQS component configuration, observability can be enabled by:
1764+
1765+
1. Setting an `ObservationRegistry` in the container options or template options
1766+
2. Optionally providing a custom `ObservationConvention` to customize the key-value pairs
1767+
1768+
```java
1769+
@Bean
1770+
SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
1771+
return SqsTemplate.builder()
1772+
.sqsAsyncClient(sqsAsyncClient)
1773+
.configure(options -> options.observationRegistry(observationRegistry))
1774+
.build();
1775+
}
1776+
1777+
@Bean
1778+
SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
1779+
return SqsMessageListenerContainerFactory.builder()
1780+
.sqsAsyncClient(sqsAsyncClient)
1781+
.configure(options -> options.observationRegistry(observationRegistry))
1782+
.build();
1783+
}
1784+
```
1785+
1786+
==== Available Observations
1787+
1788+
Spring Cloud AWS SQS provides the following observations:
1789+
1790+
1. `spring.aws.sqs.template` - Records SQS operations performed through the `SqsTemplate`
1791+
2. `spring.aws.sqs.listener` - Records message processing through the `SqsMessageListenerContainer`
1792+
1793+
Both observations include the following common tags:
1794+
1795+
Low cardinality tags:
1796+
- `messaging.system`: "sqs"
1797+
- `messaging.operation`: "publish" (template) or "receive" (listener)
1798+
- `messaging.destination.name` or `messaging.source.name`: The queue name
1799+
- `messaging.destination.kind` or `messaging.source.kind`: "queue"
1800+
1801+
High cardinality tags:
1802+
- `messaging.message.id`: The SQS message ID from AWS.
1803+
1804+
For FIFO queues, the following additional high cardinality tags are included:
1805+
- `messaging.message.message-group.id`: The message group ID
1806+
- `messaging.message.message-deduplication.id`: The message deduplication ID
1807+
1808+
==== Customizing Observations
1809+
1810+
Custom observation conventions can be provided to add custom tags or replace the default ones with custom ones.
1811+
1812+
===== Adding Custom Tags
1813+
1814+
To add custom tags while preserving all default tags, the `DefaultConvention` classes can be extended and the `getCustomLowCardinalityKeyValues` and / or getCustomHighCardinalityKeyValues method overridden:
1815+
1816+
```java
1817+
@Bean
1818+
SqsTemplateObservation.Convention sqsTemplateObservationConvention() {
1819+
return new SqsTemplateObservation.DefaultConvention() {
1820+
@Override
1821+
protected KeyValues getCustomHighCardinalityKeyValues(SqsTemplateObservation.Context context) {
1822+
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
1823+
return KeyValues.of("payment.id", paymentId);
1824+
}
1825+
};
1826+
}
1827+
```
1828+
1829+
===== Replacing Default Tags
1830+
1831+
For complete control over the observation tags, the `Convention` interfaces can be implemented directly:
1832+
1833+
```java
1834+
@Bean
1835+
SqsListenerObservation.Convention sqsListenerObservationConvention() {
1836+
return new SqsListenerObservation.Convention() {
1837+
1838+
@Override
1839+
public KeyValues getLowCardinalityKeyValues(SqsListenerObservation.Context context) {
1840+
return KeyValues.of("messaging.system", "sqs")
1841+
.and("messaging.operation", "receive")
1842+
.and("custom.tag", "custom-value");
1843+
}
1844+
1845+
@Override
1846+
public KeyValues getHighCardinalityKeyValues(SqsListenerObservation.Context context) {
1847+
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
1848+
return KeyValues.of("payment.id", paymentId);
1849+
}
1850+
};
1851+
}
1852+
```
1853+
1854+
Custom convention beans defined in the application context will be automatically wired by Spring Boot auto-configuration. For manual configuration:
1855+
1856+
```java
1857+
@Bean
1858+
SqsTemplate sqsTemplateWithCustomConvention(SqsAsyncClient sqsAsyncClient,
1859+
ObservationRegistry observationRegistry,
1860+
SqsTemplateObservation.Convention convention) {
1861+
return SqsTemplate.builder()
1862+
.sqsAsyncClient(sqsAsyncClient)
1863+
.configure(options -> options
1864+
.observationRegistry(observationRegistry)
1865+
.observationConvention(convention))
1866+
.build();
1867+
}
1868+
```
1869+
1870+
==== Context Propagation
1871+
1872+
For regular blocking components such as message listeners, interceptors, and error handlers, observation scopes are automatically managed, so no further action is required.
1873+
1874+
For asynchronous variants of these components, the observation context is not automatically propagated between threads.
1875+
However, the Observation object is injected in the message headers under the key `ObservationThreadLocalAccessor.KEY`.
1876+
A scope can be manually opened in the new thread with the following approach:
1877+
1878+
```java
1879+
// Set up the context registry with the appropriate accessors
1880+
ContextRegistry registry = new ContextRegistry();
1881+
registry.registerContextAccessor(new MessageHeaderContextAccessor());
1882+
registry.registerThreadLocalAccessor(new ObservationThreadLocalAccessor());
1883+
1884+
// Create a scope and ensure it's closed when done
1885+
try (Scope scope = ContextSnapshotFactory.builder()
1886+
.contextRegistry(registry)
1887+
.build()
1888+
.captureFrom(message.getHeaders())
1889+
.setThreadLocals()) {
1890+
// Your logic here - the observation context is now available in this thread
1891+
}
1892+
```
1893+
1894+
IMPORTANT: The scope should be closed in the same thread where it was opened to prevent thread local leakage.
1895+
17271896
=== IAM Permissions
17281897
Following IAM permissions are required by Spring Cloud AWS SQS:
17291898

spring-cloud-aws-autoconfigure/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,10 @@
176176
<artifactId>amazon-s3-encryption-client-java</artifactId>
177177
<optional>true</optional>
178178
</dependency>
179+
<dependency>
180+
<groupId>io.micrometer</groupId>
181+
<artifactId>micrometer-observation-test</artifactId>
182+
<scope>test</scope>
183+
</dependency>
179184
</dependencies>
180185
</project>

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

+19
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
3535
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
3636
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
37+
import io.awspring.cloud.sqs.support.observation.SqsListenerObservation;
38+
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
39+
import io.micrometer.observation.ObservationRegistry;
3740
import org.springframework.beans.factory.ObjectProvider;
3841
import org.springframework.boot.autoconfigure.AutoConfiguration;
3942
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
@@ -87,10 +90,18 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
8790
@ConditionalOnMissingBean
8891
@Bean
8992
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
93+
ObjectProvider<ObservationRegistry> observationRegistryProvider,
94+
ObjectProvider<SqsTemplateObservation.Convention> observationConventionProvider,
9095
MessagingMessageConverter<Message> messageConverter) {
9196
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
9297
.messageConverter(messageConverter);
9398
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
99+
if (this.sqsProperties.isObservationEnabled()) {
100+
observationRegistryProvider
101+
.ifAvailable(registry -> builder.configure(options -> options.observationRegistry(registry)));
102+
observationConventionProvider
103+
.ifAvailable(convention -> builder.configure(options -> options.observationConvention(convention)));
104+
}
94105
if (sqsProperties.getQueueNotFoundStrategy() != null) {
95106
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
96107
}
@@ -103,6 +114,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
103114
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
104115
ObjectProvider<ErrorHandler<Object>> errorHandler,
105116
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
117+
ObjectProvider<ObservationRegistry> observationRegistry,
118+
ObjectProvider<SqsListenerObservation.Convention> observationConventionProvider,
106119
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
107120
MessagingMessageConverter<?> messagingMessageConverter) {
108121

@@ -114,6 +127,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
114127
interceptors.forEach(factory::addMessageInterceptor);
115128
asyncInterceptors.forEach(factory::addMessageInterceptor);
116129
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
130+
if (this.sqsProperties.isObservationEnabled()) {
131+
observationRegistry
132+
.ifAvailable(registry -> factory.configure(options -> options.observationRegistry(registry)));
133+
observationConventionProvider
134+
.ifAvailable(convention -> factory.configure(options -> options.observationConvention(convention)));
135+
}
117136
factory.configure(options -> options.messageConverter(messagingMessageConverter));
118137
return factory;
119138
}

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java

+10
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public void setListener(Listener listener) {
4949
@Nullable
5050
private QueueNotFoundStrategy queueNotFoundStrategy;
5151

52+
private Boolean observationEnabled = false;
53+
5254
/**
5355
* Return the strategy to use if the queue is not found.
5456
* @return the {@link QueueNotFoundStrategy}
@@ -66,6 +68,14 @@ public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy
6668
this.queueNotFoundStrategy = queueNotFoundStrategy;
6769
}
6870

71+
public Boolean isObservationEnabled() {
72+
return this.observationEnabled;
73+
}
74+
75+
public void setObservationEnabled(Boolean observationEnabled) {
76+
this.observationEnabled = observationEnabled;
77+
}
78+
6979
public static class Listener {
7080

7181
/**

0 commit comments

Comments
 (0)