Skip to content

Kafka: Consumers

ronimizy edited this page Feb 3, 2024 · 16 revisions

Direct consumers

To handle consumer messages, you have to implement IKafkaConsumer<,> interface. First generic parameter denotes a message key type, while second – message value type.

This interface requires you to implement a singe HandleAsync method.

public ValueTask HandleAsync(
    IEnumerable<IKafkaConsumerMessage<MessageKey, MessageValue>> messages,
    CancellationToken cancellationToken)

After that, you need to register your consumer handler, see consumer configuration Wiki for registration documentation.

Message data

IKafkaConsumerMessage<,> contains several properties:

  • Key
    Key of the message
  • Value
    Value of the message
  • Timestamp
    Time (DateTimeOffset) when message was produced to Kafka
  • Topic
    Name of the topic from which this message was consumed
  • Partition
    Partition from which this message was consumed
  • Offset
    Offset of consumed message within it's partition

Samples

Inbox consumers

To use inbox pattern for consumer message handling, you have to implement IKafkaInboxHandler<,> interface, it's generic parameters correspond to IKafkaConsumerHandler<,> generic parameters.

This interface requires you to implement a singe HandleAsync method.

public ValueTask HandleAsync(
    IEnumerable<IKafkaInboxMessage<MessageKey, MessageValue>> messages,
    CancellationToken cancellationToken)

IKafkaInboxMessage<,> interface allows for granular control over inbox message handling result. You can use its SetResult(MessageHandleResult) method for specifying this result.

MessageHandleResult enum has a number of values:

  • Success
    Marks message as successfully handled. It will no longer be handled by your application.
  • Failure
    Marks message as failed to handle. It will no longer be handled by your application.
  • Ignored
    Marks message as not handled. It will be handled by your application at the next inbox execution until it receives Success or Failure result.

Default result, used when SetResult method was not called on instance of IKafkaInboxMessage<,>, is configurable.

IKafkaInboxMessage<,> inherits from IKafkaConsumerMessage<,> thus it does have all it's properties.

After implementing inbox handler you have to register it, see inbox confiuration Wiki for registration and configuration documentation.

Samples