Skip to content

Commit 8be7e2f

Browse files
omarkjÓmar Kjartan YasinblindspotbountyFranzBusch
authored
Allow groupID to be specified when assigning partition (#161)
* Allow groupID to be specified when assigning partition Motivation: A Consumer Group can provide a lot of benefits even if the dynamic loadbalancing features are not used. Modifications: Allow for an optional GroupID when creating a partition consumer. Result: Consumer Groups can now be used when manual assignment is used. * fix format --------- Co-authored-by: Ómar Kjartan Yasin <[email protected]> Co-authored-by: blindspotbounty <[email protected]> Co-authored-by: Franz Busch <[email protected]>
1 parent 1d32866 commit 8be7e2f

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

Diff for: Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

+15-8
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public struct KafkaConsumerConfiguration {
5757
/// A struct representing the different Kafka message consumption strategies.
5858
public struct ConsumptionStrategy: Sendable, Hashable {
5959
enum _ConsumptionStrategy: Sendable, Hashable {
60-
case partition(topic: String, partition: KafkaPartition, offset: KafkaOffset)
60+
case partition(groupID: String?, topic: String, partition: KafkaPartition, offset: KafkaOffset)
6161
case group(groupID: String, topics: [String])
6262
}
6363

@@ -72,14 +72,16 @@ public struct KafkaConsumerConfiguration {
7272
///
7373
/// - Parameters:
7474
/// - partition: The partition of the topic to consume from.
75+
/// - groupID: The ID of the consumer group to commit to. Defaults to no group ID. Specifying a group ID is useful if partitions assignment is manually managed but committed offsets should still be tracked in a consumer group.
7576
/// - topic: The name of the Kafka topic.
7677
/// - offset: The offset to start consuming from. Defaults to the end of the Kafka partition queue (meaning wait for the next produced message).
7778
public static func partition(
7879
_ partition: KafkaPartition,
80+
groupID: String? = nil,
7981
topic: String,
8082
offset: KafkaOffset = .end
8183
) -> ConsumptionStrategy {
82-
return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset))
84+
return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset))
8385
}
8486

8587
/// A consumption strategy based on consumer group membership.
@@ -261,12 +263,17 @@ extension KafkaConsumerConfiguration {
261263
var resultDict: [String: String] = [:]
262264

263265
switch self.consumptionStrategy._internal {
264-
case .partition:
265-
// Although an assignment is not related to a consumer group,
266-
// librdkafka requires us to set a `group.id`.
267-
// This is a known issue:
268-
// https://github.com/edenhill/librdkafka/issues/3261
269-
resultDict["group.id"] = UUID().uuidString
266+
case .partition(groupID: let groupID, topic: _, partition: _, offset: _):
267+
if let groupID = groupID {
268+
resultDict["group.id"] = groupID
269+
} else {
270+
// Although an assignment is not related to a consumer group,
271+
// librdkafka requires us to set a `group.id`.
272+
// This is a known issue:
273+
// https://github.com/edenhill/librdkafka/issues/3261
274+
resultDict["group.id"] = UUID().uuidString
275+
}
276+
270277
case .group(groupID: let groupID, topics: _):
271278
resultDict["group.id"] = groupID
272279
}

Diff for: Sources/Kafka/KafkaConsumer.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public final class KafkaConsumer: Sendable, Service {
359359

360360
private func _run() async throws {
361361
switch self.configuration.consumptionStrategy._internal {
362-
case .partition(topic: let topic, partition: let partition, offset: let offset):
362+
case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset):
363363
try self.assign(topic: topic, partition: partition, offset: offset)
364364
case .group(groupID: _, topics: let topics):
365365
try self.subscribe(topics: topics)

0 commit comments

Comments
 (0)