Skip to content

Observability Support for SQS #1369

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ Such attributes are available as `MessageHeaders` in received messages.
#AUTO
|Set the ContentBasedDeduplication queue attribute value of the queues the template is sending messages to.
With `ContentBasedDeduplication#AUTO`, the queue attribute value will be resolved automatically.

|`observationRegistry`
|ObservationRegistry
|ObservationRegistry.NOOP
|Set the `ObservationRegistry` to be used for recording observations during message sending and receiving operations.
See <<Observability Support>> for more information.

|`observationConvention`
|ObservationConvention
|null
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
See <<Observability Support>> for more information.
|===

[[sqs-send-message]]
Expand Down Expand Up @@ -817,6 +829,7 @@ The Spring Boot Starter for SQS provides the following auto-configuration proper
| <<maxMessagesPerPoll, `spring.cloud.aws.sqs.listener.max-messages-per-poll`>> | Maximum number of messages to be received per poll. | No | 10
| <<pollTimeout, `spring.cloud.aws.sqs.listener.poll-timeout`>> | Maximum amount of time to wait for messages in a poll. | No | 10 seconds
| `spring.cloud.aws.sqs.queue-not-found-strategy` | The strategy to be used by SqsTemplate and SqsListeners when a queue does not exist. | No | CREATE
| `spring.cloud.aws.sqs.observation-enabled` | Enables observability support for SQS operations. | No | false
|===


Expand Down Expand Up @@ -974,6 +987,18 @@ For `FIFO` queues, visibility is extended for all messages in a message group be
See <<FIFO Support>>.
Otherwise, visibility is specified once when polling SQS.

|`observationRegistry`
|`ObservationRegistry`
|`ObservationRegistry.NOOP`
|Sets the `ObservationRegistry` to be used for recording observations during message processing.
See <<Observability Support>>.

|`observationConvention`
|`ObservationConvention`
|`null`
|Sets a custom `ObservationConvention` to be used for customizing observation key-value pairs.
See <<Observability Support>>.

|`queueNotFoundStrategy`
|`FAIL`, `CREATE`
|`CREATE`
Expand Down Expand Up @@ -1724,6 +1749,152 @@ There can be multiple `SqsAsyncClientCustomizer` beans present in single applica
Note that `SqsAsyncClientCustomizer` beans are applied **after** `AwsAsyncClientCustomizer` beans and therefore can overwrite previously set configurations.


=== Observability Support

Spring Cloud AWS SQS supports observability through Micrometer's Observation API. This integration provides the ability to monitor and trace SQS operations throughout your application.

==== Enabling Observability

Observability can be enabled in a Spring Boot application by:

1. Setting the `spring.cloud.aws.sqs.observation-enabled` property to `true`
2. Having a `ObservationRegistry` bean in your application context

When using direct SQS component configuration, observability can be enabled by:

1. Setting an `ObservationRegistry` in the container options or template options
2. Optionally providing a custom `ObservationConvention` to customize the key-value pairs

```java
@Bean
SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options.observationRegistry(observationRegistry))
.build();
}

@Bean
SqsMessageListenerContainerFactory<Object> sqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient, ObservationRegistry observationRegistry) {
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options.observationRegistry(observationRegistry))
.build();
}
```

==== Available Observations

Spring Cloud AWS SQS provides the following observations:

1. `spring.aws.sqs.template` - Records SQS operations performed through the `SqsTemplate`
2. `spring.aws.sqs.listener` - Records message processing through the `SqsMessageListenerContainer`

Both observations include the following common tags:

Low cardinality tags:
- `messaging.system`: "sqs"
- `messaging.operation`: "publish" (template) or "receive" (listener)
- `messaging.destination.name` or `messaging.source.name`: The queue name
- `messaging.destination.kind` or `messaging.source.kind`: "queue"

High cardinality tags:
- `messaging.message.id`: The SQS message ID from AWS.

For FIFO queues, the following additional high cardinality tags are included:
- `messaging.message.message-group.id`: The message group ID
- `messaging.message.message-deduplication.id`: The message deduplication ID

==== Customizing Observations

Custom observation conventions can be provided to add custom tags or replace the default ones with custom ones.

===== Adding Custom Tags

To add custom tags while preserving all default tags, the `DefaultConvention` classes can be extended and the `getCustomLowCardinalityKeyValues` and / or getCustomHighCardinalityKeyValues method overridden:

```java
@Bean
SqsTemplateObservation.Convention sqsTemplateObservationConvention() {
return new SqsTemplateObservation.DefaultConvention() {
@Override
protected KeyValues getCustomHighCardinalityKeyValues(SqsTemplateObservation.Context context) {
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
return KeyValues.of("payment.id", paymentId);
}
};
}
```

===== Replacing Default Tags

For complete control over the observation tags, the `Convention` interfaces can be implemented directly:

```java
@Bean
SqsListenerObservation.Convention sqsListenerObservationConvention() {
return new SqsListenerObservation.Convention() {

@Override
public KeyValues getLowCardinalityKeyValues(SqsListenerObservation.Context context) {
return KeyValues.of("messaging.system", "sqs")
.and("messaging.operation", "receive")
.and("custom.tag", "custom-value");
}

@Override
public KeyValues getHighCardinalityKeyValues(SqsListenerObservation.Context context) {
String paymentId = MessageHeaderUtils.getHeaderAsString(context.getMessage(), "payment-id-header-name");
return KeyValues.of("payment.id", paymentId);
}
};
}
```

Custom convention beans defined in the application context will be automatically wired by Spring Boot auto-configuration. For manual configuration:

```java
@Bean
SqsTemplate sqsTemplateWithCustomConvention(SqsAsyncClient sqsAsyncClient,
ObservationRegistry observationRegistry,
SqsTemplateObservation.Convention convention) {
return SqsTemplate.builder()
.sqsAsyncClient(sqsAsyncClient)
.configure(options -> options
.observationRegistry(observationRegistry)
.observationConvention(convention))
.build();
}
```

==== Context Propagation

For regular blocking components such as message listeners, interceptors, and error handlers, observation scopes are automatically managed, and no further action is required.

Remote baggage propagation is supported through the "baggage" message header.

For asynchronous variants of these components, the observation context is not automatically propagated between threads.
However, the Observation object is injected in the message headers under the key `ObservationThreadLocalAccessor.KEY`.
A scope can be manually opened in the new thread with the following approach:

```java
// Set up the context registry with the appropriate accessors
ContextRegistry registry = new ContextRegistry();
registry.registerContextAccessor(new MessageHeaderContextAccessor());
registry.registerThreadLocalAccessor(new ObservationThreadLocalAccessor());

// Create a scope and ensure it's closed when done
try (Scope scope = ContextSnapshotFactory.builder()
.contextRegistry(registry)
.build()
.captureFrom(message.getHeaders())
.setThreadLocals()) {
// Your logic here - the observation context is now available in this thread
}
```

IMPORTANT: The scope should be closed in the same thread where it was opened to prevent thread local leakage.

=== IAM Permissions
Following IAM permissions are required by Spring Cloud AWS SQS:

Expand Down
5 changes: 5 additions & 0 deletions spring-cloud-aws-autoconfigure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,10 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-observation-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import io.awspring.cloud.sqs.operations.SqsTemplateBuilder;
import io.awspring.cloud.sqs.support.converter.MessagingMessageConverter;
import io.awspring.cloud.sqs.support.converter.SqsMessagingMessageConverter;
import io.awspring.cloud.sqs.support.observation.SqsListenerObservation;
import io.awspring.cloud.sqs.support.observation.SqsTemplateObservation;
import io.micrometer.observation.ObservationRegistry;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
Expand Down Expand Up @@ -87,10 +90,18 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
@ConditionalOnMissingBean
@Bean
public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<ObjectMapper> objectMapperProvider,
ObjectProvider<ObservationRegistry> observationRegistryProvider,
ObjectProvider<SqsTemplateObservation.Convention> observationConventionProvider,
MessagingMessageConverter<Message> messageConverter) {
SqsTemplateBuilder builder = SqsTemplate.builder().sqsAsyncClient(sqsAsyncClient)
.messageConverter(messageConverter);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messageConverter, om));
if (this.sqsProperties.isObservationEnabled()) {
observationRegistryProvider
.ifAvailable(registry -> builder.configure(options -> options.observationRegistry(registry)));
observationConventionProvider
.ifAvailable(convention -> builder.configure(options -> options.observationConvention(convention)));
}
if (sqsProperties.getQueueNotFoundStrategy() != null) {
builder.configure((options) -> options.queueNotFoundStrategy(sqsProperties.getQueueNotFoundStrategy()));
}
Expand All @@ -103,6 +114,8 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
ObjectProvider<SqsAsyncClient> sqsAsyncClient, ObjectProvider<AsyncErrorHandler<Object>> asyncErrorHandler,
ObjectProvider<ErrorHandler<Object>> errorHandler,
ObjectProvider<AsyncMessageInterceptor<Object>> asyncInterceptors,
ObjectProvider<ObservationRegistry> observationRegistry,
ObjectProvider<SqsListenerObservation.Convention> observationConventionProvider,
ObjectProvider<MessageInterceptor<Object>> interceptors, ObjectProvider<ObjectMapper> objectMapperProvider,
MessagingMessageConverter<?> messagingMessageConverter) {

Expand All @@ -114,6 +127,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
interceptors.forEach(factory::addMessageInterceptor);
asyncInterceptors.forEach(factory::addMessageInterceptor);
objectMapperProvider.ifAvailable(om -> setMapperToConverter(messagingMessageConverter, om));
if (this.sqsProperties.isObservationEnabled()) {
observationRegistry
.ifAvailable(registry -> factory.configure(options -> options.observationRegistry(registry)));
observationConventionProvider
.ifAvailable(convention -> factory.configure(options -> options.observationConvention(convention)));
}
factory.configure(options -> options.messageConverter(messagingMessageConverter));
return factory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public void setListener(Listener listener) {
@Nullable
private QueueNotFoundStrategy queueNotFoundStrategy;

private Boolean observationEnabled = false;

/**
* Return the strategy to use if the queue is not found.
* @return the {@link QueueNotFoundStrategy}
Expand All @@ -66,6 +68,14 @@ public void setQueueNotFoundStrategy(QueueNotFoundStrategy queueNotFoundStrategy
this.queueNotFoundStrategy = queueNotFoundStrategy;
}

public Boolean isObservationEnabled() {
return this.observationEnabled;
}

public void setObservationEnabled(Boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

public static class Listener {

/**
Expand Down
Loading