A processor may fail to process a task because of errors in system components. When the failure happened, you may want to hold the task for a while and try to process it in the near future.
Decaton has a built-in mechanism for you to achieve this simply.
You want to process a task again when there is something unexpected happened.
When your service needs to access other services, there is no guarantee that those services will always work as you expected. In this case, typically your consumer consumes messages from a topic, processing it, then delegating remaining work to another service.
No matter the service is own by your team or third party company, transient failure and downtime should always be taken into consideration when designing your service.
You might not want to just throw away the failed message and process the next one. What’s worse is that the upcoming message will likely fail for the same reason because the third party service may not recover soon.
Instead, maybe you pause for a second and retry the message many times. But what if it keeps failing? Maybe you will want to record how many times the massage is attempted.
There are a few strategies you can take to implement these retry logic.
Since implementing it could be cumbersome Decaton provides you with Retry Queuing
to help retry failures with simple configurations.
To enable retry function, you need to:
-
Prepare a retry topic for Decaton
-
Enable retry by
enableRetry
method when configuring the Decaton processor
Decaton needs a retry topic for retry functionality. The name of the retry topic will be topic name
+ -retry
by default.
For example, if you have a topic called weather-task
then you need to create a topic called weather-task-retry
for retry function.
You can also overwrite the default naming convention.
The following example demonstrates how to enable retry function:
...
SubscriptionBuilder.newBuilder("testProcessor")
.enableRetry(RetryConfig.withBackoff(Duration.ofMillis(100))) // (1)
.processorsBuilder(
ProcessorsBuilder.consuming(
"my-decaton-topic", // (2)
new ProtocolBuffersDeserializer<>(PrintMessageTask.parser()))
.thenProcess(new RetryingProcessorSync())
)
...
-
Let’s say the processor decided to retry a task at time
T
then the processor will process it at or later thanT + 100 millis
. -
Because we don’t overwrite the name of the retry topic so it will be
my-decaton-topic-retry
.
If you are using distributed tracing and want to see retries as part of the same trace, make sure that RetryConfig contains a KafkaProducerSupplier that will propagate trace information.
After enabling the retry function, you can use it in your processor:
public class RetryingProcessorSync implements DecatonProcessor<PrintMessageTask> {
@Override
public void process(ProcessingContext<PrintMessageTask> context, PrintMessageTask task)
throws InterruptedException {
// Do something...
// Oops! Something unexpected happened... Let's retry it.
if (context.metadata().retryCount() > 10) { // (1)
log.error("Not recovered within {} times. Discard the task", context.metadata().retryCount());
} else {
context.retry(); // (2)
}
}
}
-
Decaton will record how many time a task has been retried. You can use this method to get the number of counts.
-
Simply invoking
ProcessingContext#retry
then Decaton will handle remaining works for you.
Normally when you are doing asynchronous processing you have to complete the task yourself.
But if you invoke ProcessingContext#retry
in the processor, you don’t have to do this because Decaton will take the responsibility for you.
public class RetryingProcessorAsync implements DecatonProcessor<PrintMessageTask> {
@Override
public void process(ProcessingContext<PrintMessageTask> context, PrintMessageTask task)
throws InterruptedException {
Completion completion = context.deferCompletion();
CompletableFuture<String> userInfo = getUserInfoAsync("Test");
userInfo.whenComplete((user, exception) -> {
if (exception == null) {
// Do something...
completion.complete(); // (1)
} else {
try {
context.retry(); // (2)
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
});
}
}
-
You still need to call
#complete
when the asynchronous job finishes normally. -
If you call
ProcessingContext#retry
then you don’t need to call it explicitly.
In this section, we will briefly explain how is Retry Queuing implemented.
When user invoked ProcessingContext#retry
, the following things happen:
-
The current task is marked as completed.
-
The task is produced to
retry
topic with the metadata that records when the take should be retried. -
The consumer inside Decaton actually subscribes to both normal topic and
retry
topic when you enable the retry function. When the polled task is about to be processed, Decaton will first look at its metadata to make sure it can be processed. If it is not the time to process it, Decaton will wait until the task can be processed. So the following task processed by the same thread will be blocked for a while.