Skip to content

Commit

Permalink
SQS ack and failure handlers (#2903)
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp authored Jan 27, 2025
1 parent dbaa3a6 commit 0c212c1
Show file tree
Hide file tree
Showing 21 changed files with 675 additions and 45 deletions.
32 changes: 30 additions & 2 deletions documentation/src/main/docs/sqs/receiving-aws-sqs-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,38 @@ SQS message attributes can be accessed from the metadata either by name or by th
{{ insert('sqs/inbound/SqsMetadataExample.java') }}
```

## Acknowledgement
## Acknowledgement Strategies

The default strategy for acknowledging AWS SQS Message is to *delete* the message from the queue.
With `ack.delete` set to `false`, the message is not deleted from the queue.
You can set the `ack-strategy` attribute to `ignore` if you want to ignore the message.

[NOTE] Deprecated
`ack.delete` attribute is deprecated and will be removed in a future release.

You can implement a custom strategy by implementing the {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }},
interface with a `Factory` class and registering it as a CDI bean with an `@Identifier`.

``` java
{{ insert('sqs/inbound/SqsCustomAckStrategy.java') }}
```

## Failure Strategies

The default strategy for handling message processing failures is `ignore`.
It lets the visibility timeout of the message consumer to expire and reconsume the message.

Other possible strategies are:

- `fail`: the failure is logged and the channel fail-stops.
- `delete`: the message is removed from the queue.
- `visibility`: the message visibility timeout is reset to 0.

You can implement a custom strategy by implementing the {{ javadoc('io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler', False, 'io.smallrye.reactive/smallrye-reactive-messaging-aws-sqs') }},
interface with a `Factory` class and registering it as a CDI bean with an `@Identifier`.

``` java
{{ insert('sqs/inbound/SqsCustomNackStrategy.java') }}
```

## Configuration Reference

Expand Down
36 changes: 36 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsCustomAckStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomAckStrategy implements SqsAckHandler {

@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsAckHandler.Factory {

@Override
public SqsAckHandler create(SqsConnectorIncomingConfiguration conf,
Vertx vertx,
SqsAsyncClient client,
Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomAckStrategy();
}
}

@Override
public Uni<Void> handle(SqsMessage<?> message) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
33 changes: 33 additions & 0 deletions documentation/src/main/java/sqs/inbound/SqsCustomNackStrategy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package sqs.inbound;

import java.util.function.BiConsumer;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.SqsFailureHandler;
import io.smallrye.reactive.messaging.aws.sqs.SqsMessage;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public class SqsCustomNackStrategy implements SqsFailureHandler {

@ApplicationScoped
@Identifier("custom")
public static class Factory implements SqsFailureHandler.Factory {

@Override
public SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure) {
return new SqsCustomNackStrategy();
}
}

@Override
public Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable) {
return Uni.createFrom().voidItem()
.emitOn(message::runOnMessageContext);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,32 @@
package io.smallrye.reactive.messaging.aws.sqs;

import java.util.function.BiConsumer;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.core.Vertx;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public interface SqsAckHandler {

Uni<Void> handle(SqsMessage message);
interface Strategy {
String DELETE = "delete";
String IGNORE = "ignore";
}

interface Factory {
SqsAckHandler create(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsAsyncClient client,
Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure);
}

default Uni<SqsMessage<?>> received(SqsMessage<?> message) {
return Uni.createFrom().item(message);
}

Uni<Void> handle(SqsMessage<?> message);

default void close(boolean graceful) {

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package io.smallrye.reactive.messaging.aws.sqs;

import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING;
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING;

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Flow.Publisher;
Expand Down Expand Up @@ -32,26 +36,28 @@

@ApplicationScoped
@Connector(SqsConnector.CONNECTOR_NAME)
@ConnectorAttribute(name = "queue", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the SQS queue, defaults to channel name if not provided")
@ConnectorAttribute(name = "queue.url", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The url of the SQS queue")
@ConnectorAttribute(name = "region", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The name of the SQS region")
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")
@ConnectorAttribute(name = "queue", type = "string", direction = INCOMING_AND_OUTGOING, description = "The name of the SQS queue, defaults to channel name if not provided")
@ConnectorAttribute(name = "queue.url", type = "string", direction = INCOMING_AND_OUTGOING, description = "The url of the SQS queue")
@ConnectorAttribute(name = "region", type = "string", direction = INCOMING_AND_OUTGOING, description = "The name of the SQS region")
@ConnectorAttribute(name = "endpoint-override", type = "string", direction = INCOMING_AND_OUTGOING, description = "The endpoint override")
@ConnectorAttribute(name = "credentials-provider", type = "string", direction = INCOMING_AND_OUTGOING, description = "The credential provider to be used in the client")
@ConnectorAttribute(name = "health-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether health reporting is enabled (default) or disabled", defaultValue = "true")

@ConnectorAttribute(name = "group.id", type = "string", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages with the specified group id")
@ConnectorAttribute(name = "batch", type = "boolean", direction = ConnectorAttribute.Direction.OUTGOING, description = "When set, sends messages in batches of maximum 10 messages", defaultValue = "false")
@ConnectorAttribute(name = "batch-size", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum number of messages to include in batch, currently SQS maximum is 10 messages", defaultValue = "10")
@ConnectorAttribute(name = "batch-delay", type = "int", direction = ConnectorAttribute.Direction.OUTGOING, description = "In batch send mode, the maximum delay in milliseconds to wait for messages to be included in the batch", defaultValue = "3000")

@ConnectorAttribute(name = "wait-time-seconds", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum amount of time in seconds to wait for messages to be received", defaultValue = "1")
@ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The maximum number of messages to receive", defaultValue = "10")
@ConnectorAttribute(name = "visibility-timeout", type = "int", direction = ConnectorAttribute.Direction.INCOMING, description = "The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request")
@ConnectorAttribute(name = "receive.request.message-attribute-names", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The message attribute names to retrieve when receiving messages.")
@ConnectorAttribute(name = "receive.request.customizer", type = "string", direction = ConnectorAttribute.Direction.INCOMING, description = "The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided")
@ConnectorAttribute(name = "receive.request.retries", type = "long", direction = ConnectorAttribute.Direction.INCOMING, description = "If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue = "2147483647")
@ConnectorAttribute(name = "receive.request.pause.resume", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true")
@ConnectorAttribute(name = "ack.delete", type = "boolean", direction = ConnectorAttribute.Direction.INCOMING, description = "Whether the acknowledgement deletes the message from the queue", defaultValue = "true")
@ConnectorAttribute(name = "wait-time-seconds", type = "int", direction = INCOMING, description = "The maximum amount of time in seconds to wait for messages to be received", defaultValue = "1")
@ConnectorAttribute(name = "max-number-of-messages", type = "int", direction = INCOMING, description = "The maximum number of messages to receive", defaultValue = "10")
@ConnectorAttribute(name = "visibility-timeout", type = "int", direction = INCOMING, description = "The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a receive request")
@ConnectorAttribute(name = "receive.request.message-attribute-names", type = "string", direction = INCOMING, description = "The message attribute names to retrieve when receiving messages.")
@ConnectorAttribute(name = "receive.request.customizer", type = "string", direction = INCOMING, description = "The identifier for the bean implementing a customizer to receive requests, defaults to channel name if not provided")
@ConnectorAttribute(name = "receive.request.retries", type = "long", direction = INCOMING, description = "If set to a positive number, the connector will try to retry the request that was not delivered successfully (with a potentially transient error) until the number of retries is reached. If set to 0, retries are disabled.", defaultValue = "2147483647")
@ConnectorAttribute(name = "receive.request.pause.resume", type = "boolean", direction = INCOMING, description = "Whether the polling must be paused when the application does not request items and resume when it does. This allows implementing back-pressure based on the application capacity. Note that polling is not stopped, but will not retrieve any records when paused.", defaultValue = "true")
@ConnectorAttribute(name = "ack.delete", type = "boolean", direction = INCOMING, description = "Whether the acknowledgement deletes the message from the queue. Deprecated, use ack-strategy instead", deprecated = true)
@ConnectorAttribute(name = "ack-strategy", type = "string", direction = INCOMING, description = "The identifier for the bean implementing ack strategy factory. Strategies: 'delete', 'ignore'", defaultValue = "delete")
@ConnectorAttribute(name = "failure-strategy", type = "string", direction = INCOMING, description = "The identifier for the bean implementing failure strategy factory. Strategies: 'ignore', 'fail', 'visibility', 'delete'", defaultValue = "ignore")
public class SqsConnector implements InboundConnector, OutboundConnector, HealthReporter {

@Inject
Expand All @@ -67,6 +73,14 @@ public class SqsConnector implements InboundConnector, OutboundConnector, Health
@Inject
Instance<JsonMapping> jsonMappers;

@Inject
@Any
Instance<SqsAckHandler.Factory> ackHandlerFactories;

@Inject
@Any
Instance<SqsFailureHandler.Factory> failureHandlerFactories;

Vertx vertx;

private static final List<SqsInboundChannel> INBOUND_CHANNELS = new CopyOnWriteArrayList<>();
Expand All @@ -93,7 +107,8 @@ public Publisher<? extends Message<?>> getPublisher(Config config) {
var conf = new SqsConnectorIncomingConfiguration(config);
var customizer = CDIUtils.getInstanceById(customizers, conf.getReceiveRequestCustomizer().orElse(conf.getChannel()),
() -> null);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping);
var channel = new SqsInboundChannel(conf, vertx, sqsManager, customizer, jsonMapping, ackHandlerFactories,
failureHandlerFactories);
INBOUND_CHANNELS.add(channel);
return channel.getStream();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.smallrye.reactive.messaging.aws.sqs;

import java.util.function.BiConsumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

import io.smallrye.mutiny.Uni;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;

public interface SqsFailureHandler {

interface Strategy {
String DELETE = "delete";
String VISIBILITY = "visibility";
String FAIL = "fail";
String IGNORE = "ignore";
}

interface Factory {
SqsFailureHandler create(String channel, SqsAsyncClient client, Uni<String> queueUrlUni,
BiConsumer<Throwable, Boolean> reportFailure);
}

Uni<Void> handle(SqsMessage<?> message, Metadata metadata, Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import jakarta.enterprise.inject.Instance;

import org.eclipse.microprofile.reactive.messaging.Message;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsDeleteAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsNothingAckHandler;
import io.smallrye.reactive.messaging.aws.sqs.ack.SqsIgnoreAckHandler;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.json.JsonMapping;
import io.smallrye.reactive.messaging.providers.helpers.CDIUtils;
import io.smallrye.reactive.messaging.providers.helpers.PausablePollingStream;
import io.vertx.core.impl.VertxInternal;
import io.vertx.mutiny.core.Context;
Expand Down Expand Up @@ -47,7 +49,9 @@ public class SqsInboundChannel {
private final Integer visibilityTimeout;

public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsManager sqsManager,
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper) {
SqsReceiveMessageRequestCustomizer customizer, JsonMapping jsonMapper,
Instance<SqsAckHandler.Factory> ackHandlerFactories,
Instance<SqsFailureHandler.Factory> failureHandlerFactories) {
this.channel = conf.getChannel();
this.healthEnabled = conf.getHealthEnabled();
this.retries = conf.getReceiveRequestRetries();
Expand All @@ -62,8 +66,8 @@ public SqsInboundChannel(SqsConnectorIncomingConfiguration conf, Vertx vertx, Sq
this.messageAttributeNames = getMessageAttributeNames(conf);
this.customizer = customizer;

SqsAckHandler ackHandler = conf.getAckDelete() ? new SqsDeleteAckHandler(client, queueUrlUni)
: new SqsNothingAckHandler();
SqsAckHandler ackHandler = createAckHandler(ackHandlerFactories, conf, vertx, client, queueUrlUni);
SqsFailureHandler failureHandler = createFailureHandler(failureHandlerFactories, conf, client, queueUrlUni);
PausablePollingStream<List<software.amazon.awssdk.services.sqs.model.Message>, software.amazon.awssdk.services.sqs.model.Message> pollingStream = new PausablePollingStream<>(
channel, request(null, 0), (messages, processor) -> {
if (messages != null) {
Expand All @@ -74,14 +78,32 @@ channel, request(null, 0), (messages, processor) -> {
}, requestExecutor, maxNumberOfMessages * 2, conf.getReceiveRequestPauseResume());
this.stream = Multi.createFrom()
.deferred(() -> queueUrlUni.onItem().transformToMulti(queueUrl -> pollingStream.getStream()))
.emitOn(r -> context.runOnContext(r))
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler))
.emitOn(context::runOnContext)
.onItem().transform(message -> new SqsMessage<>(message, jsonMapper, ackHandler, failureHandler))
.onFailure().invoke(throwable -> {
log.errorReceivingMessage(channel, throwable);
reportFailure(throwable, false);
});
}

private SqsFailureHandler createFailureHandler(Instance<SqsFailureHandler.Factory> failureHandlerFactories,
SqsConnectorIncomingConfiguration conf,
SqsAsyncClient client, Uni<String> queueUrlUni) {
return CDIUtils.getInstanceById(failureHandlerFactories, conf.getFailureStrategy())
.get()
.create(channel, client, queueUrlUni, this::reportFailure);
}

private SqsAckHandler createAckHandler(Instance<SqsAckHandler.Factory> ackHandlerFactories,
SqsConnectorIncomingConfiguration conf, Vertx vertx, SqsAsyncClient client, Uni<String> queueUrlUni) {
if (!conf.getAckDelete().orElse(true)) {
// nothing to do
return new SqsIgnoreAckHandler();
}
return CDIUtils.getInstanceById(ackHandlerFactories, conf.getAckStrategy()).get()
.create(conf, vertx, client, queueUrlUni, this::reportFailure);
}

private List<String> getMessageAttributeNames(SqsConnectorIncomingConfiguration conf) {
List<String> names = new ArrayList<>();
names.add(SqsConnector.CLASS_NAME_ATTRIBUTE);
Expand Down
Loading

0 comments on commit 0c212c1

Please sign in to comment.