Skip to content

docs(microservices): enhance Kafka retry handling documentation #3246

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 96 additions & 36 deletions content/microservices/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -435,95 +435,126 @@ throw new KafkaRetriableException('...');

### Custom exception handling

Along with the default error handling mechanisms, you can create a custom Exception Filter for Kafka events to manage retry logic. For instance, the example below demonstrates how to skip a problematic event after a configurable number of retries:
Nest provides two levels of retry handling for Kafka microservices:

1. **Connection-level retry**: Controls how the Kafka client retries connection attempts
2. **Message-level retry**: Controls how individual messages are retried with circuit breaker pattern

#### Connection-level retry

To configure connection retry behavior, you can use the `retry` option in the client configuration. This allows you to specify the retry strategy for connection attempts:

```typescript
import { Catch, ArgumentsHost, Logger } from '@nestjs/common';
import { BaseExceptionFilter } from '@nestjs/core';
import { KafkaContext } from '../ctx-host';
const microserviceOptions: MicroserviceOptions = {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
retry: {
// Initial retry delay in milliseconds
initialRetryTime: 300,
// Maximum number of retries
retries: 4,
// Multiplier for increasing wait time between retries (e.g., 2x)
factor: 2,
},
},
},
};

app.connectMicroservice<MicroserviceOptions>(microserviceOptions);
```

> info **Hint** The `retry` configuration is part of the Kafka client options and follows the KafkaJS retry configuration pattern.

#### Message-level retry with Circuit Breaker

For more sophisticated message retry handling, you can implement a circuit breaker pattern using a custom exception filter. This pattern helps prevent cascading failures and provides better control over message processing.

The following example demonstrates how to create a custom exception filter that implements the circuit breaker pattern:

```typescript
import { Catch, ArgumentsHost, Logger, ExceptionFilter } from '@nestjs/common';
import { KafkaContext } from '@nestjs/microservices';

@Catch()
export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter {
export class KafkaMaxRetryExceptionFilter implements ExceptionFilter {
private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);
private failureCount = 0;
private circuitOpen = false;
private openTimestamp = 0;
private readonly circuitResetTimeout = 30000;

constructor(
private readonly maxRetries: number,
// Optional custom function executed when max retries are exceeded
private readonly skipHandler?: (message: any) => Promise<void>,
) {
super();
}
) {}

async catch(exception: unknown, host: ArgumentsHost) {
const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
const message = kafkaContext.getMessage();
const currentRetryCount = this.getRetryCountFromContext(kafkaContext);

if (currentRetryCount >= this.maxRetries) {
this.logger.warn(
`Max retries (${
this.maxRetries
}) exceeded for message: ${JSON.stringify(message)}`,
);
if (this.circuitOpen) {
if (Date.now() - this.openTimestamp >= this.circuitResetTimeout) {
this.logger.log('Circuit breaker timeout elapsed. Resetting breaker.');
this.circuitOpen = false;
this.failureCount = 0;
} else {
this.logger.warn('Circuit breaker open. Skipping message processing.');
await this.commitOffset(kafkaContext);
return;
}
}

this.failureCount++;
this.logger.log('failureCount', { failureCount: this.failureCount });

if (this.failureCount >= this.maxRetries) {
this.logger.warn(`Circuit breaker triggered after ${this.failureCount} failures. Skipping message processing.`);
if (this.skipHandler) {
try {
await this.skipHandler(message);
} catch (err) {
this.logger.error('Error in skipHandler:', err);
}
}

try {
await this.commitOffset(kafkaContext);
} catch (commitError) {
this.logger.error('Failed to commit offset:', commitError);
}
return; // Stop propagating the exception
this.circuitOpen = true;
this.openTimestamp = Date.now();
return;
}

// If retry count is below the maximum, proceed with the default Exception Filter logic
super.catch(exception, host);
}

private getRetryCountFromContext(context: KafkaContext): number {
const headers = context.getMessage().headers || {};
const retryHeader = headers['retryCount'] || headers['retry-count'];
return retryHeader ? Number(retryHeader) : 0;
throw exception;
}

private async commitOffset(context: KafkaContext): Promise<void> {
const consumer = context.getConsumer && context.getConsumer();
if (!consumer) {
throw new Error('Consumer instance is not available from KafkaContext.');
}

const topic = context.getTopic && context.getTopic();
const partition = context.getPartition && context.getPartition();
const message = context.getMessage();
const offset = message.offset;

if (!topic || partition === undefined || offset === undefined) {
throw new Error(
'Incomplete Kafka message context for committing offset.',
);
throw new Error('Incomplete Kafka message context for committing offset.');
}

await consumer.commitOffsets([
{
topic,
partition,
// When committing an offset, commit the next number (i.e., current offset + 1)
offset: (Number(offset) + 1).toString(),
},
]);
}
}
```

This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption.

You can integrate this filter by adding it to your event handlers:
To use this filter, apply it to your event handlers using the `@UseFilters()` decorator:

```typescript
@UseFilters(new KafkaMaxRetryExceptionFilter(5))
Expand All @@ -535,6 +566,35 @@ export class MyEventHandler {
}
```

> info **Hint** The circuit breaker pattern helps prevent cascading failures by temporarily stopping message processing when a certain number of failures occur, and automatically resuming after a configured timeout period.

You can also apply different retry settings for each event handler within the same class. This allows you to have fine-grained control over retry behavior based on the specific event:

```typescript
export class MyEventHandler {
@UseFilters(new KafkaMaxRetryExceptionFilter(2))
@EventPattern('test')
handleEvent() {
throw new Error('test');
}

@UseFilters(new KafkaMaxRetryExceptionFilter(4))
@EventPattern('test2')
handleGetEvent() {
throw new Error('test');
}
}
```

> info **Hint** When applying filters at the method level, each event handler can have its own retry configuration, allowing for different retry strategies based on the importance or nature of the event being processed.

This implementation provides several benefits:
- Circuit breaker pattern to prevent cascading failures
- Configurable retry limits
- Automatic circuit reset after a timeout period
- Optional custom handler for skipped messages
- Detailed logging of retry attempts and circuit breaker state

#### Commit offsets

Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).
Expand Down