Skip to content

Latest commit

 

History

History
687 lines (564 loc) · 24.9 KB

File metadata and controls

687 lines (564 loc) · 24.9 KB

Kafka

Kafka is an open source, distributed streaming platform which has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way.
  • Process streams of records as they occur.

The Kafka project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. It integrates very well with Apache Storm and Spark for real-time streaming data analysis.

Installation

To start building Kafka-based microservices, first install the required package:

$ npm i --save kafkajs

Overview

Like other Nest microservice transport layer implementations, you select the Kafka transporter mechanism using the transport property of the options object passed to the createMicroservice() method, along with an optional options property, as shown below:

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

info Hint The Transport enum is imported from the @nestjs/microservices package.

Options

The options property is specific to the chosen transporter. The Kafka transporter exposes the properties described below.

client Client configuration options (read more here)
consumer Consumer configuration options (read more here)
run Run configuration options (read more here)
subscribe Subscribe configuration options (read more here)
producer Producer configuration options (read more here)
send Send configuration options (read more here)
producerOnlyMode Feature flag to skip consumer group registration and only act as a producer (boolean)
postfixId Change suffix of clientId value (string)

Client

There is a small difference in Kafka compared to other microservice transporters. Instead of the ClientProxy class, we use the ClientKafkaProxy class.

Like other microservice transporters, you have several options for creating a ClientKafkaProxy instance.

One method for creating an instance is to use the ClientsModule. To create a client instance with the ClientsModule, import it and use the register() method to pass an options object with the same properties shown above in the createMicroservice() method, as well as a name property to be used as the injection token. Read more about ClientsModule here.

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

Other options to create a client (either ClientProxyFactory or @Client()) can be used as well. You can read about them here.

Use the @Client() decorator as follows:

@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero',
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer'
    }
  }
})
client: ClientKafkaProxy;

Message pattern

The Kafka microservice message pattern utilizes two topics for the request and reply channels. The ClientKafkaProxy#send() method sends messages with a return address by associating a correlation id, reply topic, and reply partition with the request message. This requires the ClientKafkaProxy instance to be subscribed to the reply topic and assigned to at least one partition before sending a message.

Subsequently, you need to have at least one reply topic partition for every Nest application running. For example, if you are running 4 Nest applications but the reply topic only has 3 partitions, then 1 of the Nest applications will error out when trying to send a message.

When new ClientKafkaProxy instances are launched they join the consumer group and subscribe to their respective topics. This process triggers a rebalance of topic partitions assigned to consumers of the consumer group.

Normally, topic partitions are assigned using the round robin partitioner, which assigns topic partitions to a collection of consumers sorted by consumer names which are randomly set on application launch. However, when a new consumer joins the consumer group, the new consumer can be positioned anywhere within the collection of consumers. This creates a condition where pre-existing consumers can be assigned different partitions when the pre-existing consumer is positioned after the new consumer. As a result, the consumers that are assigned different partitions will lose response messages of requests sent before the rebalance.

To prevent the ClientKafkaProxy consumers from losing response messages, a Nest-specific built-in custom partitioner is utilized. This custom partitioner assigns partitions to a collection of consumers sorted by high-resolution timestamps (process.hrtime()) that are set on application launch.

Message response subscription

warning Note This section is only relevant if you use request-response message style (with the @MessagePattern decorator and the ClientKafkaProxy#send method). Subscribing to the response topic is not necessary for the event-based communication (@EventPattern decorator and ClientKafkaProxy#emit method).

The ClientKafkaProxy class provides the subscribeToResponseOf() method. The subscribeToResponseOf() method takes a request's topic name as an argument and adds the derived reply topic name to a collection of reply topics. This method is required when implementing the message pattern.

@@filename(heroes.controller)
onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
}

If the ClientKafkaProxy instance is created asynchronously, the subscribeToResponseOf() method must be called before calling the connect() method.

@@filename(heroes.controller)
async onModuleInit() {
  this.client.subscribeToResponseOf('hero.kill.dragon');
  await this.client.connect();
}

Incoming

Nest receives incoming Kafka messages as an object with key, value, and headers properties that have values of type Buffer. Nest then parses these values by transforming the buffers into strings. If the string is "object like", Nest attempts to parse the string as JSON. The value is then passed to its associated handler.

Outgoing

Nest sends outgoing Kafka messages after a serialization process when publishing events or sending messages. This occurs on arguments passed to the ClientKafkaProxy emit() and send() methods or on values returned from a @MessagePattern method. This serialization "stringifies" objects that are not strings or buffers by using JSON.stringify() or the toString() prototype method.

@@filename(heroes.controller)
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const dragonId = message.dragonId;
    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];
    return items;
  }
}

info Hint @Payload() is imported from the @nestjs/microservices package.

Outgoing messages can also be keyed by passing an object with the key and value properties. Keying messages is important for meeting the co-partitioning requirement.

@@filename(heroes.controller)
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        realm
      },
      key: heroId,
      value: items
    }
  }
}

Additionally, messages passed in this format can also contain custom headers set in the headers hash property. Header hash property values must be either of type string or type Buffer.

@@filename(heroes.controller)
@Controller()
export class HeroesController {
  @MessagePattern('hero.kill.dragon')
  killDragon(@Payload() message: KillDragonMessage): any {
    const realm = 'Nest';
    const heroId = message.heroId;
    const dragonId = message.dragonId;

    const items = [
      { id: 1, name: 'Mythical Sword' },
      { id: 2, name: 'Key to Dungeon' },
    ];

    return {
      headers: {
        kafka_nestRealm: realm
      },
      key: heroId,
      value: items
    }
  }
}

Event-based

While the request-response method is ideal for exchanging messages between services, it is less suitable when your message style is event-based (which in turn is ideal for Kafka) - when you just want to publish events without waiting for a response. In that case, you do not want the overhead required by request-response for maintaining two topics.

Check out these two sections to learn more about this: Overview: Event-based and Overview: Publishing events.

Context

In more complex scenarios, you may need to access additional information about the incoming request. When using the Kafka transporter, you can access the KafkaContext object.

@@filename()
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  console.log(`Topic: ${context.getTopic()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
  console.log(`Topic: ${context.getTopic()}`);
}

info Hint @Payload(), @Ctx() and KafkaContext are imported from the @nestjs/microservices package.

To access the original Kafka IncomingMessage object, use the getMessage() method of the KafkaContext object, as follows:

@@filename()
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

Where the IncomingMessage fulfills the following interface:

interface IncomingMessage {
  topic: string;
  partition: number;
  timestamp: string;
  size: number;
  attributes: number;
  offset: string;
  key: any;
  value: any;
  headers: Record<string, any>;
}

If your handler involves a slow processing time for each received message you should consider using the heartbeat callback. To retrieve the heartbeat function, use the getHeartbeat() method of the KafkaContext, as follows:

@@filename()
@MessagePattern('hero.kill.dragon')
async killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // Do some slow processing
  await doWorkPart1();

  // Send heartbeat to not exceed the sessionTimeout
  await heartbeat();

  // Do some slow processing again
  await doWorkPart2();
}

Naming conventions

The Kafka microservice components append a description of their respective role onto the client.clientId and consumer.groupId options to prevent collisions between Nest microservice client and server components. By default the ClientKafkaProxy components append -client and the ServerKafka components append -server to both of these options. Note how the provided values below are transformed in that way (as shown in the comments).

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-server
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-server
    },
  }
});

And for the client:

@@filename(heroes.controller)
@Client({
  transport: Transport.KAFKA,
  options: {
    client: {
      clientId: 'hero', // hero-client
      brokers: ['localhost:9092'],
    },
    consumer: {
      groupId: 'hero-consumer' // hero-consumer-client
    }
  }
})
client: ClientKafkaProxy;

info Hint Kafka client and consumer naming conventions can be customized by extending ClientKafkaProxy and KafkaServer in your own custom provider and overriding the constructor.

Since the Kafka microservice message pattern utilizes two topics for the request and reply channels, a reply pattern should be derived from the request topic. By default, the name of the reply topic is the composite of the request topic name with .reply appended.

@@filename(heroes.controller)
onModuleInit() {
  this.client.subscribeToResponseOf('hero.get'); // hero.get.reply
}

info Hint Kafka reply topic naming conventions can be customized by extending ClientKafkaProxy in your own custom provider and overriding the getResponsePatternName method.

Retriable exceptions

Similar to other transporters, all unhandled exceptions are automatically wrapped into an RpcException and converted to a "user-friendly" format. However, there are edge-cases when you might want to bypass this mechanism and let exceptions be consumed by the kafkajs driver instead. Throwing an exception when processing a message instructs kafkajs to retry it (redeliver it) which means that even though the message (or event) handler was triggered, the offset won't be committed to Kafka.

warning Warning For event handlers (event-based communication), all unhandled exceptions are considered retriable exceptions by default.

For this, you can use a dedicated class called KafkaRetriableException, as follows:

throw new KafkaRetriableException('...');

info Hint KafkaRetriableException class is exported from the @nestjs/microservices package.

Custom exception handling

Nest provides two levels of retry handling for Kafka microservices:

  1. Connection-level retry: Controls how the Kafka client retries connection attempts
  2. Message-level retry: Controls how individual messages are retried with circuit breaker pattern

Connection-level retry

To configure connection retry behavior, you can use the retry option in the client configuration. This allows you to specify the retry strategy for connection attempts:

const microserviceOptions: MicroserviceOptions = {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
      retry: {
        // Initial retry delay in milliseconds
        initialRetryTime: 300,
        // Maximum number of retries
        retries: 4,
        // Multiplier for increasing wait time between retries (e.g., 2x)
        factor: 2,
      },
    },
  },
};

app.connectMicroservice<MicroserviceOptions>(microserviceOptions);

info Hint The retry configuration is part of the Kafka client options and follows the KafkaJS retry configuration pattern.

Message-level retry with Circuit Breaker

For more sophisticated message retry handling, you can implement a circuit breaker pattern using a custom exception filter. This pattern helps prevent cascading failures and provides better control over message processing.

The following example demonstrates how to create a custom exception filter that implements the circuit breaker pattern:

import { Catch, ArgumentsHost, Logger, ExceptionFilter } from '@nestjs/common';
import { KafkaContext } from '@nestjs/microservices';

@Catch()
export class KafkaMaxRetryExceptionFilter implements ExceptionFilter {
  private readonly logger = new Logger(KafkaMaxRetryExceptionFilter.name);
  private failureCount = 0;
  private circuitOpen = false;
  private openTimestamp = 0;
  private readonly circuitResetTimeout = 30000;

  constructor(
    private readonly maxRetries: number,
    private readonly skipHandler?: (message: any) => Promise<void>,
  ) {}

  async catch(exception: unknown, host: ArgumentsHost) {
    const kafkaContext = host.switchToRpc().getContext<KafkaContext>();
    const message = kafkaContext.getMessage();

    if (this.circuitOpen) {
      if (Date.now() - this.openTimestamp >= this.circuitResetTimeout) {
        this.logger.log('Circuit breaker timeout elapsed. Resetting breaker.');
        this.circuitOpen = false;
        this.failureCount = 0;
      } else {
        this.logger.warn('Circuit breaker open. Skipping message processing.');
        await this.commitOffset(kafkaContext);
        return;
      }
    }

    this.failureCount++;
    this.logger.log('failureCount', { failureCount: this.failureCount });

    if (this.failureCount >= this.maxRetries) {
      this.logger.warn(`Circuit breaker triggered after ${this.failureCount} failures. Skipping message processing.`);
      if (this.skipHandler) {
        try {
          await this.skipHandler(message);
        } catch (err) {
          this.logger.error('Error in skipHandler:', err);
        }
      }
      try {
        await this.commitOffset(kafkaContext);
      } catch (commitError) {
        this.logger.error('Failed to commit offset:', commitError);
      }
      this.circuitOpen = true;
      this.openTimestamp = Date.now();
      return;
    }

    throw exception;
  }

  private async commitOffset(context: KafkaContext): Promise<void> {
    const consumer = context.getConsumer && context.getConsumer();
    if (!consumer) {
      throw new Error('Consumer instance is not available from KafkaContext.');
    }
    const topic = context.getTopic && context.getTopic();
    const partition = context.getPartition && context.getPartition();
    const message = context.getMessage();
    const offset = message.offset;
    if (!topic || partition === undefined || offset === undefined) {
      throw new Error('Incomplete Kafka message context for committing offset.');
    }
    await consumer.commitOffsets([
      {
        topic,
        partition,
        offset: (Number(offset) + 1).toString(),
      },
    ]);
  }
}

To use this filter, apply it to your event handlers using the @UseFilters() decorator:

@UseFilters(new KafkaMaxRetryExceptionFilter(5))
export class MyEventHandler {
  @EventPattern('your-topic')
  async handleEvent(@Payload() data: any, @Ctx() context: KafkaContext) {
    // Your event processing logic...
  }
}

info Hint The circuit breaker pattern helps prevent cascading failures by temporarily stopping message processing when a certain number of failures occur, and automatically resuming after a configured timeout period.

You can also apply different retry settings for each event handler within the same class. This allows you to have fine-grained control over retry behavior based on the specific event:

export class MyEventHandler {
  @UseFilters(new KafkaMaxRetryExceptionFilter(2))
  @EventPattern('test')
  handleEvent() {
    throw new Error('test');
  }

  @UseFilters(new KafkaMaxRetryExceptionFilter(4))
  @EventPattern('test2')
  handleGetEvent() {
    throw new Error('test');
  }
}

info Hint When applying filters at the method level, each event handler can have its own retry configuration, allowing for different retry strategies based on the importance or nature of the event being processed.

This implementation provides several benefits:

  • Circuit breaker pattern to prevent cascading failures
  • Configurable retry limits
  • Automatic circuit reset after a timeout period
  • Optional custom handler for skipped messages
  • Detailed logging of retry attempts and circuit breaker state

Commit offsets

Committing offsets is essential when working with Kafka. Per default, messages will be automatically committed after a specific time. For more information visit KafkaJS docs. KafkaContext offers a way to access the active consumer for manually committing offsets. The consumer is the KafkaJS consumer and works as the native KafkaJS implementation.

@@filename()
@EventPattern('user.created')
async handleUserCreated(@Payload() data: IncomingMessage, @Ctx() context: KafkaContext) {
  // business logic

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  const consumer = context.getConsumer();
  await consumer.commitOffsets([{ topic, partition, offset }])
}
@@switch
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
  // business logic

  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  const consumer = context.getConsumer();
  await consumer.commitOffsets([{ topic, partition, offset }])
}

To disable auto-committing of messages set autoCommit: false in the run configuration, as follows:

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    run: {
      autoCommit: false
    }
  }
});

Instance status updates

To get real-time updates on the connection and the state of the underlying driver instance, you can subscribe to the status stream. This stream provides status updates specific to the chosen driver. For the Kafka driver, the status stream emits connected, disconnected, rebalancing, crashed, and stopped events.

this.client.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

info Hint The KafkaStatus type is imported from the @nestjs/microservices package.

Similarly, you can subscribe to the server's status stream to receive notifications about the server's status.

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

Underlying producer and consumer

For more advanced use cases, you may need to access the underlying prodocuer and consumer instances. This can be useful for scenarios like manually closing the connection or using driver-specific methods. However, keep in mind that for most cases, you shouldn't need to access the driver directly.

To do so, you can use producer and consumer getters exposed by the ClientKafkaProxy instance.

const producer = this.client.producer;
const consumer = this.client.consumer;