Skip to content

Commit 5ec944d

Browse files
Observability Support for SQS (#1369)
* Observability Support for SQS Resolves #1367
1 parent 803c550 commit 5ec944d

File tree

44 files changed

+4416
-56
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+4416
-56
lines changed

docs/src/main/asciidoc/sqs.adoc

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,18 @@ Such attributes are available as `MessageHeaders` in received messages.
219219
#AUTO
220220
|Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to.
221221
With `ContentBasedDeduplication#AUTO`, the queue attribute value will be resolved automatically.
222+
223+
|`observationRegistry`
224+
|ObservationRegistry
225+
|ObservationRegistry.NOOP
226+
|Set the `ObservationRegistry` to be used for recording observations during message sending and receiving operations.
227+
See <<Observability Support>> for more information.
228+
229+
|`observationConvention`
230+
|ObservationConvention
231+
|null
232+
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
233+
See <<Observability Support>> for more information.
222234
|===
223235

224236
[[sqs-send-message]]
@@ -819,6 +831,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
819831
| <<pollTimeout, `spring.cloud.aws.sqs.listener.poll-timeout`>> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds
820832
| <<maxDelayBetweenPolls, `spring.cloud.aws.sqs.listener.max-delay-between-polls`>> | Maximum amount of time to wait between polls. | No | 10 seconds
821833
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
834+
| `spring.cloud.aws.sqs.observation-enabled` | Enables observability support for SQS operations. | No | false
822835
|===
823836

824837

@@ -976,6 +989,18 @@ For `FIFO` queues, visibility is extended for all messages in a message group be
976989
See <<FIFO Support>>.
977990
Otherwise, visibility is specified once when polling SQS.
978991

992+
|`observationRegistry`
993+
|`ObservationRegistry`
994+
|`ObservationRegistry.NOOP`
995+
|Sets the `ObservationRegistry` to be used for recording observations during message processing.
996+
See <<Observability Support>>.
997+
998+
|`observationConvention`
999+
|`ObservationConvention`
1000+
|`null`
1001+
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
1002+
See <<Observability Support>>.
1003+
9791004
|`queueNotFoundStrategy`
9801005
|`FAIL`, `CREATE`
9811006
|`CREATE`
@@ -1803,6 +1828,152 @@ There can be multiple `SqsAsyncClientCustomizer` beans present in single applica
18031828
Note that `SqsAsyncClientCustomizer` beans are applied **after** `AwsAsyncClientCustomizer` beans and therefore can overwrite previously set configurations.
18041829

18051830

1831+
=== Observability Support
1832+
1833+
Spring Cloud AWS SQS supports observability through Micrometer's Observation API. This integration provides the ability to monitor and trace SQS operations throughout your application.
1834+
1835+
==== Enabling Observability
1836+
1837+
Observability can be enabled in a Spring Boot application by:
1838+
1839+
1. Setting the `spring.cloud.aws.sqs.observation-enabled` property to `true`
1840+
2. Having a `ObservationRegistry` bean in your application context
1841+
1842+
When using direct SQS component configuration, observability can be enabled by:
1843+
1844+
1. Setting an `ObservationRegistry` in the container options or template options
1845+
2. Optionally providing a custom `ObservationConvention` to customize the key-value pairs
1846+
1847+
```java
1848+
@Bean
1849+
SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
1850+
return SqsTemplate.builder()
1851+
.sqsAsyncClient(sqsAsyncClient)
1852+
.configure(options -> options.observationRegistry(observationRegistry))
1853+
.build();
1854+
}
1855+
1856+
@Bean
1857+
SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
1858+
return SqsMessageListenerContainerFactory.builder()
1859+
.sqsAsyncClient(sqsAsyncClient)
1860+
.configure(options -> options.observationRegistry(observationRegistry))
1861+
.build();
1862+
}
1863+
```
1864+
1865+
==== Available Observations
1866+
1867+
Spring Cloud AWS SQS provides the following observations:
1868+
1869+
1. `spring.aws.sqs.template` - Records SQS operations performed through the `SqsTemplate`
1870+
2. `spring.aws.sqs.listener` - Records message processing through the `SqsMessageListenerContainer`
1871+
1872+
Both observations include the following common tags:
1873+
1874+
Low cardinality tags:
1875+
- `messaging.system`: "sqs"
1876+
- `messaging.operation`: "publish" (template) or "receive" (listener)
1877+
- `messaging.destination.name` or `messaging.source.name`: The queue name
1878+
- `messaging.destination.kind` or `messaging.source.kind`: "queue"
1879+
1880+
High cardinality tags:
1881+
- `messaging.message.id`: The SQS message ID from AWS.
1882+
1883+
For FIFO queues, the following additional high cardinality tags are included:
1884+
- `messaging.message.message-group.id`: The message group ID
1885+
- `messaging.message.message-deduplication.id`: The message deduplication ID
1886+
1887+
==== Customizing Observations
1888+
1889+
Custom observation conventions can be provided to add custom tags or replace the default ones with custom ones.
1890+
1891+
===== Adding Custom Tags
1892+
1893+
To add custom tags while preserving all default tags, the `DefaultConvention` classes can be extended and the `getCustomLowCardinalityKeyValues` and / or getCustomHighCardinalityKeyValues method overridden:
1894+
1895+
```java
1896+
@Bean
1897+
SqsTemplateObservation.Convention sqsTemplateObservationConvention() {
1898+
return new SqsTemplateObservation.DefaultConvention() {
1899+
@Override
1900+
protected KeyValues getCustomHighCardinalityKeyValues(SqsTemplateObservation.Context context) {
1901+
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
1902+
return KeyValues.of("payment.id", paymentId);
1903+
}
1904+
};
1905+
}
1906+
```
1907+
1908+
===== Replacing Default Tags
1909+
1910+
For complete control over the observation tags, the `Convention` interfaces can be implemented directly:
1911+
1912+
```java
1913+
@Bean
1914+
SqsListenerObservation.Convention sqsListenerObservationConvention() {
1915+
return new SqsListenerObservation.Convention() {
1916+
1917+
@Override
1918+
public KeyValues getLowCardinalityKeyValues(SqsListenerObservation.Context context) {
1919+
return KeyValues.of("messaging.system", "sqs")
1920+
.and("messaging.operation", "receive")
1921+
.and("custom.tag", "custom-value");
1922+
}
1923+
1924+
@Override
1925+
public KeyValues getHighCardinalityKeyValues(SqsListenerObservation.Context context) {
1926+
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
1927+
return KeyValues.of("payment.id", paymentId);
1928+
}
1929+
};
1930+
}
1931+
```
1932+
1933+
Custom convention beans defined in the application context will be automatically wired by Spring Boot auto-configuration. For manual configuration:
1934+
1935+
```java
1936+
@Bean
1937+
SqsTemplate sqsTemplateWithCustomConvention(SqsAsyncClient sqsAsyncClient,
1938+
ObservationRegistry observationRegistry,
1939+
SqsTemplateObservation.Convention convention) {
1940+
return SqsTemplate.builder()
1941+
.sqsAsyncClient(sqsAsyncClient)
1942+
.configure(options -> options
1943+
.observationRegistry(observationRegistry)
1944+
.observationConvention(convention))
1945+
.build();
1946+
}
1947+
```
1948+
1949+
==== Context Propagation
1950+
1951+
For regular blocking components such as message listeners, interceptors, and error handlers, observation scopes are automatically managed, and no further action is required.
1952+
1953+
Remote baggage propagation is supported through the "baggage" message header.
1954+
1955+
For asynchronous variants of these components, the observation context is not automatically propagated between threads.
1956+
However, the Observation object is injected in the message headers under the key `ObservationThreadLocalAccessor.KEY`.
1957+
A scope can be manually opened in the new thread with the following approach:
1958+
1959+
```java
1960+
// Set up the context registry with the appropriate accessors
1961+
ContextRegistry registry = new ContextRegistry();
1962+
registry.registerContextAccessor(new MessageHeaderContextAccessor());
1963+
registry.registerThreadLocalAccessor(new ObservationThreadLocalAccessor());
1964+
1965+
// Create a scope and ensure it's closed when done
1966+
try (Scope scope = ContextSnapshotFactory.builder()
1967+
.contextRegistry(registry)
1968+
.build()
1969+
.captureFrom(message.getHeaders())
1970+
.setThreadLocals()) {
1971+
// Your logic here - the observation context is now available in this thread
1972+
}
1973+
```
1974+
1975+
IMPORTANT: The scope should be closed in the same thread where it was opened to prevent thread local leakage.
1976+
18061977
=== IAM Permissions
18071978
Following IAM permissions are required by Spring Cloud AWS SQS:
18081979

spring-cloud-aws-autoconfigure/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,5 +176,10 @@
176176
<artifactId>amazon-s3-encryption-client-java</artifactId>
177177
<optional>true</optional>
178178
</dependency>
179+
<dependency>
180+
<groupId>io.micrometer</groupId>
181+
<artifactId>micrometer-observation-test</artifactId>
182+
<scope>test</scope>
183+
</dependency>
179184
</dependencies>
180185
</project>

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsAutoConfiguration.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@
3434
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
3535
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
3636
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
37+
import io.awspring.cloud.sqs.support.observation.SqsListenerObservation;
38+
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
39+
import io.micrometer.observation.ObservationRegistry;
3740
import org.springframework.beans.factory.ObjectProvider;
3841
import org.springframework.boot.autoconfigure.AutoConfiguration;
3942
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
@@ -87,10 +90,18 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
8790
@ConditionalOnMissingBean
8891
@Bean
8992
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
93+
ObjectProvider<ObservationRegistry> observationRegistryProvider,
94+
ObjectProvider<SqsTemplateObservation.Convention> observationConventionProvider,
9095
MessagingMessageConverter<Message> messageConverter) {
9196
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
9297
.messageConverter(messageConverter);
9398
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
99+
if (this.sqsProperties.isObservationEnabled()) {
100+
observationRegistryProvider
101+
.ifAvailable(registry -> builder.configure(options -> options.observationRegistry(registry)));
102+
observationConventionProvider
103+
.ifAvailable(convention -> builder.configure(options -> options.observationConvention(convention)));
104+
}
94105
if (sqsProperties.getQueueNotFoundStrategy() != null) {
95106
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
96107
}
@@ -103,6 +114,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
103114
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
104115
ObjectProvider<ErrorHandler<Object>> errorHandler,
105116
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
117+
ObjectProvider<ObservationRegistry> observationRegistry,
118+
ObjectProvider<SqsListenerObservation.Convention> observationConventionProvider,
106119
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
107120
MessagingMessageConverter<?> messagingMessageConverter) {
108121

@@ -114,6 +127,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
114127
interceptors.forEach(factory::addMessageInterceptor);
115128
asyncInterceptors.forEach(factory::addMessageInterceptor);
116129
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
130+
if (this.sqsProperties.isObservationEnabled()) {
131+
observationRegistry
132+
.ifAvailable(registry -> factory.configure(options -> options.observationRegistry(registry)));
133+
observationConventionProvider
134+
.ifAvailable(convention -> factory.configure(options -> options.observationConvention(convention)));
135+
}
117136
factory.configure(options -> options.messageConverter(messagingMessageConverter));
118137
return factory;
119138
}

spring-cloud-aws-autoconfigure/src/main/java/io/awspring/cloud/autoconfigure/sqs/SqsProperties.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public void setListener(Listener listener) {
4949
@Nullable
5050
private QueueNotFoundStrategy queueNotFoundStrategy;
5151

52+
private Boolean observationEnabled = false;
53+
5254
/**
5355
* Return the strategy to use if the queue is not found.
5456
* @return the {@link QueueNotFoundStrategy}
@@ -66,6 +68,14 @@ public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy
6668
this.queueNotFoundStrategy = queueNotFoundStrategy;
6769
}
6870

71+
public Boolean isObservationEnabled() {
72+
return this.observationEnabled;
73+
}
74+
75+
public void setObservationEnabled(Boolean observationEnabled) {
76+
this.observationEnabled = observationEnabled;
77+
}
78+
6979
public static class Listener {
7080

7181
/**

0 commit comments

Comments
 (0)