Skip to content

Kafka: Configuration

ronimizy edited this page Feb 3, 2024 · 21 revisions

General

To add platform Kafka to DI container, you should call AddPlatformKafka extension method on an IServiceCollection instance. This method accepts a delegate, that have to perform some configuration actions on a builder. This builder allows you to specify configuration, add consumers and producers.

collection.AddPlatformKafka(builder => builder
    .ConfigureOptions(configuration.GetSection("Presentation:Kafka")))

Configuration schema

{
  "Host": string,
  "SecurityProtocol": string enum,
  "SslCaPem": string,
  "SaslMechanism": string enum,
  "SaslUsername": string,
  "SaslPassword": string
}
  • Host Bootstrap servers of Kafka cluster.
  • SecurityProtocol Enum that specifies a security protocol used for connection to Kafka cluster. Allowed values: Plaintext (default), Ssl, SaslPlaintext, SaslSsl). This value determines whether some other configuration values are required or not.
  • SslCaPem String representation of CA certificate. Required if SecurityProtocol is set to Ssl or SaslSsl.
  • SaslMechanism Enum that specifies Sasl mechanism used for connection to Kafka cluster. Allowed values: Gssapi, Plain, ScramSha256, ScramSha512, OAuthBearer. Used if SecurityProtocol is set to SaslPlaintext or SaslSsl, but not required (default value = null).
  • SaslUsername Username used to authorise when connecting to Kafka cluster. Required if SecurityProtocol is set to SaslPlaintext or SaslSsl.
  • SaslPassword Password used to authorise when connecting to Kafka cluster. Required if SecurityProtocol is set to SaslPlaintext or SaslSsl.

Configuration example

{
  "Presentation": {
    "Kafka": {
      "Host": "localhost:8001",
      "SecurityProtocol": "SaslSsl",
      "SslCaPem": "...",
      "SaslMechanism": "ScramSha512",
      "SaslUsername": "username",
      "SaslPassword": "password"
    }
  }
}

Consumer

To register a consumer, you have to use same builder, provided in AddPlatformKafka extension method.

Call method AddConsumer in builder's call chain, after ConfigureOptions call. It accepts a delegate, used to configure consumer's builder.

You must configure message's key and value type, configuration, key and value deserialisers and message handler. All these methods can only be called in a strict order, which is shown in a snipped below.

string group = Assembly.GetExecutingAssembly().GetName().Name ?? string.Empty;

collection.AddPlatformKafka(builder => builder
    .ConfigureOptions(...)
    .AddConsumer(b => b
        .WithKey<MessageKey>()
        .WithValue<MessageValue>()
        .WithConfiguration(configuration.GetSection("Presentation:Kafka:Consumers:MessageName"), c => c.WithGroup(group))
        .DeserializeKeyWithNewtonsoft()
        .DeserializeValueWithNewtonsoft()
        .HandleWith<MessageHandler>()));

WithConfiguration method can accept a delegate as a second parameter for additional configuration, it allows to configure consumer group and instance id for consumer via WithGroup and WithInstanceId methods respectively.

Configuration schema

{
    "IsDisabled": bool,
    "Topic": string,
    "Group": string,
    "InstanceId": string,
    "ParallelismDegree": int,
    "BufferSize": int,
    "BufferWaitLimit": string timespan,
    "ReadLatest": bool
}
  • IsDisabled Specifies whether consumer is disabled. As consumer options are used as IOptionsMonitor<>, it will react in real time to this options being changed.
  • Topic Topic name to be consumer. Required. Changing this option during application execution will lead to undefined behaviour.
  • Group Consumer group name used when connecting to Kafka cluster. Not required, default is empty string.
  • InstanceId Id of instance in specified consumer group. Not required, default is empty string.
  • ParallelismDegree Number of threads that will handle consumed messages. Not required, default is 1, must be >= 1.
  • BufferSize Consumer handler receives messages in batches, this parameter is used to configure maximum batch size. Not required, default is 1, must be >= 1.
  • BufferWaitLimit To avoid consumer starvation when batching, you can configure maximum time waiting for a single batch accumulation. If the wait time for a singe batch exceeds this threshold, the batch will be yielded with items that were already collected. Not required when BufferSize = 1, otherwise must be > TimeSpan.Zero.
  • ReadLatest Configures an offset selection strategy for consumer's first subscription to the topic. When true the offset would default to the latest message in the topic, when false the offset will be configured to the first message in the topic. Not required, default = false.

Configuration example

  "Presentation": {
    "Kafka": {
      "Consumers": {
        "MessageName": {
          "IsDisabled": false,
          "Topic": "my_topic",
          "ParallelismDegree": 2,
          "BufferSize": 100,
          "BufferWaitLimit": "00:00:01.500",
          "ReadLatest": false
        },
      },
    }
  }
}

Inbox

Producer

Outbox