diff --git a/content/microservices/kafka.md b/content/microservices/kafka.md index f6ab7a9dce..9a264dee87 100644 --- a/content/microservices/kafka.md +++ b/content/microservices/kafka.md @@ -435,37 +435,82 @@ throw new KafkaRetriableException('...'); ### Custom exception handling -Along with the default error handling mechanisms, you can create a custom Exception Filter for Kafka events to manage retry logic. For instance, the example below demonstrates how to skip a problematic event after a configurable number of retries: +Nest provides two levels of retry handling for Kafka microservices: + +1. **Connection-level retry**: Controls how the Kafka client retries connection attempts +2. **Message-level retry**: Controls how individual messages are retried with circuit breaker pattern + +#### Connection-level retry + +To configure connection retry behavior, you can use the `retry` option in the client configuration. This allows you to specify the retry strategy for connection attempts: ```typescript -import { Catch, ArgumentsHost, Logger } from '@nestjs/common'; -import { BaseExceptionFilter } from '@nestjs/core'; -import { KafkaContext } from '../ctx-host'; +const microserviceOptions: MicroserviceOptions = { + transport: Transport.KAFKA, + options: { + client: { + brokers: ['localhost:9092'], + retry: { + // Initial retry delay in milliseconds + initialRetryTime: 300, + // Maximum number of retries + retries: 4, + // Multiplier for increasing wait time between retries (e.g., 2x) + factor: 2, + }, + }, + }, +}; + +app.connectMicroservice(microserviceOptions); +``` + +> info **Hint** The `retry` configuration is part of the Kafka client options and follows the KafkaJS retry configuration pattern. + +#### Message-level retry with Circuit Breaker + +For more sophisticated message retry handling, you can implement a circuit breaker pattern using a custom exception filter. This pattern helps prevent cascading failures and provides better control over message processing. + +The following example demonstrates how to create a custom exception filter that implements the circuit breaker pattern: + +```typescript +import { Catch, ArgumentsHost, Logger, ExceptionFilter } from '@nestjs/common'; +import { KafkaContext } from '@nestjs/microservices'; @Catch() -export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { +export class KafkaMaxRetryExceptionFilter implements ExceptionFilter { private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name); + private failureCount = 0; + private circuitOpen = false; + private openTimestamp = 0; + private readonly circuitResetTimeout = 30000; constructor( private readonly maxRetries: number, - // Optional custom function executed when max retries are exceeded private readonly skipHandler?: (message: any) => Promise, - ) { - super(); - } + ) {} async catch(exception: unknown, host: ArgumentsHost) { const kafkaContext = host.switchToRpc().getContext(); const message = kafkaContext.getMessage(); - const currentRetryCount = this.getRetryCountFromContext(kafkaContext); - if (currentRetryCount >= this.maxRetries) { - this.logger.warn( - `Max retries (${ - this.maxRetries - }) exceeded for message: ${JSON.stringify(message)}`, - ); + if (this.circuitOpen) { + if (Date.now() - this.openTimestamp >= this.circuitResetTimeout) { + this.logger.log('Circuit breaker timeout elapsed. Resetting breaker.'); + this.circuitOpen = false; + this.failureCount = 0; + } else { + this.logger.warn('Circuit breaker open. Skipping message processing.'); + await this.commitOffset(kafkaContext); + return; + } + } + this.failureCount++; + this.logger.log('failureCount', { failureCount: this.failureCount }); + + if (this.failureCount >= this.maxRetries) { + this.logger.warn(`Circuit breaker triggered after ${this.failureCount} failures. Skipping message processing.`); if (this.skipHandler) { try { await this.skipHandler(message); @@ -473,23 +518,17 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { this.logger.error('Error in skipHandler:', err); } } - try { await this.commitOffset(kafkaContext); } catch (commitError) { this.logger.error('Failed to commit offset:', commitError); } - return; // Stop propagating the exception + this.circuitOpen = true; + this.openTimestamp = Date.now(); + return; } - // If retry count is below the maximum, proceed with the default Exception Filter logic - super.catch(exception, host); - } - - private getRetryCountFromContext(context: KafkaContext): number { - const headers = context.getMessage().headers || {}; - const retryHeader = headers['retryCount'] || headers['retry-count']; - return retryHeader ? Number(retryHeader) : 0; + throw exception; } private async commitOffset(context: KafkaContext): Promise { @@ -497,23 +536,17 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { if (!consumer) { throw new Error('Consumer instance is not available from KafkaContext.'); } - const topic = context.getTopic && context.getTopic(); const partition = context.getPartition && context.getPartition(); const message = context.getMessage(); const offset = message.offset; - if (!topic || partition === undefined || offset === undefined) { - throw new Error( - 'Incomplete Kafka message context for committing offset.', - ); + throw new Error('Incomplete Kafka message context for committing offset.'); } - await consumer.commitOffsets([ { topic, partition, - // When committing an offset, commit the next number (i.e., current offset + 1) offset: (Number(offset) + 1).toString(), }, ]); @@ -521,9 +554,7 @@ export class KafkaMaxRetryExceptionFilter extends BaseExceptionFilter { } ``` -This filter offers a way to retry processing a Kafka event up to a configurable number of times. Once the maximum retries are reached, it triggers a custom `skipHandler` (if provided) and commits the offset, effectively skipping the problematic event. This allows subsequent events to be processed without interruption. - -You can integrate this filter by adding it to your event handlers: +To use this filter, apply it to your event handlers using the `@UseFilters()` decorator: ```typescript @UseFilters(new KafkaMaxRetryExceptionFilter(5)) @@ -535,6 +566,35 @@ export class MyEventHandler { } ``` +> info **Hint** The circuit breaker pattern helps prevent cascading failures by temporarily stopping message processing when a certain number of failures occur, and automatically resuming after a configured timeout period. + +You can also apply different retry settings for each event handler within the same class. This allows you to have fine-grained control over retry behavior based on the specific event: + +```typescript +export class MyEventHandler { + @UseFilters(new KafkaMaxRetryExceptionFilter(2)) + @EventPattern('test') + handleEvent() { + throw new Error('test'); + } + + @UseFilters(new KafkaMaxRetryExceptionFilter(4)) + @EventPattern('test2') + handleGetEvent() { + throw new Error('test'); + } +} +``` + +> info **Hint** When applying filters at the method level, each event handler can have its own retry configuration, allowing for different retry strategies based on the importance or nature of the event being processed. + +This implementation provides several benefits: +- Circuit breaker pattern to prevent cascading failures +- Configurable retry limits +- Automatic circuit reset after a timeout period +- Optional custom handler for skipped messages +- Detailed logging of retry attempts and circuit breaker state + #### Commit offsets Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit [KafkaJS docs](https://kafka.js.org/docs/consuming#autocommit). `KafkaContext` offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the [native KafkaJS implementation](https://kafka.js.org/docs/consuming#manual-committing).