Closed
Description
Type: Bug
Component: SQS
Describe the bug
Error
s thrown in SQS listeners aren't picked up by io.awspring.cloud.sqs.listener.errorhandler.ErrorHandler
s. Instead an exception is thrown: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
. Because the methods defined by the interface accept Throwable
s I assume that it is intended for Error
s to be handleable.
I'm using io.awspring.cloud:spring-cloud-aws-starter-sqs
version 3.3.0.
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:950)
at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2372)
at io.awspring.cloud.sqs.CompletableFutures.handleCompose(CompletableFutures.java:73)
at io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage.process(AcknowledgementHandlerExecutionStage.java:47)
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
at io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink.lambda$execute$0(AbstractMessageProcessingPipelineSink.java:99)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
at io.awspring.cloud.sqs.listener.pipeline.AcknowledgementHandlerExecutionStage.lambda$process$2(AcknowledgementHandlerExecutionStage.java:50)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:934)
... 10 common frames omitted
Caused by: java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:950)
at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2372)
at io.awspring.cloud.sqs.CompletableFutures.handleCompose(CompletableFutures.java:73)
at io.awspring.cloud.sqs.listener.pipeline.AbstractAfterProcessingInterceptorExecutionStage.process(AbstractAfterProcessingInterceptorExecutionStage.java:43)
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104)
... 6 common frames omitted
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.wrapAndRethrowError(ListenerExecutionFailedException.java:115)
at io.awspring.cloud.sqs.listener.ListenerExecutionFailedException.unwrapMessage(ListenerExecutionFailedException.java:84)
Caused by: java.lang.IllegalArgumentException: No ListenerExecutionFailedException found to unwrap messages.
Sample
@Component
public class Boom {
@SqsListener("https://sqs.us-west-2.amazonaws.com/...")
public void boom(JsonNode message) {
// When an exception is thrown the error handler is invoked as expected
// throw new RuntimeException("exception");
// This does not invoke the error handler
throw new Error("error");
}
}
@Component
public class AsyncSqsErrorHandler implements AsyncErrorHandler<Object> {
public CompletableFuture<Void> handle(Message<Object> message, Throwable throwable) {
return CompletableFuture.runAsync(() -> {
System.out.println("Handling error");
});
}
public CompletableFuture<Void> handle(Collection<Message<Object>> messages, Throwable throwable) {
return CompletableFuture.runAsync(() -> {
System.out.println("Handling error");
});
}
}
@Configuration
public class SqsConfig {
@Bean
public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsClient, AsyncSqsErrorHandler asyncErrorHandler) {
return SqsMessageListenerContainerFactory.builder()
.sqsAsyncClient(sqsClient)
.errorHandler(asyncErrorHandler)
.build();
}
}
Thank you!