Skip to content

Commit ffb5deb

Browse files
Fix: Kafka consumer leaves group after 5 mins (#113)
* Fix: Kafka consumer leaves group after 5 mins Motivation: This PR fixes issue #110. `KakfaConsumer`: by polling the `rd_kafka_queue_get_main` queue instead of the `rd_kafka_queue_get_consumer` queue, the timer for `max.poll.interval.ms` did not get reset which eventually resulted in a timeout despite polling. (See [`librdkafka` documentation](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#acacdb55ae7cb6abfbde89621e512b078)) Modifications: * `RDKafkaClient`: * rename `mainQueue` to `queue` * use `rd_kafka_queue_get_consumer` instead of `rd_kafka_queue_get_main` for `KakfaConsumer` clients -> this will reset the timer for `max.poll.interval.ms` so that the consumer does not time out despite polling * invoke `rd_kafka_queue_destroy(self.queue)` on `RDKafkaClient.deinit` to loose reference to queue * Review blindspot Modifications: * update comment at invocation of `RDKafkaClient.pollSetConsumer` * don't fail softly when `rd_kafka_queue_get_consumer` returns `nil` * don't create new reference to consumer queue in `RDKafkaClient.consumerClose()`
1 parent 5925e64 commit ffb5deb

File tree

2 files changed

+30
-16
lines changed

2 files changed

+30
-16
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,7 @@ public final class KafkaConsumer: Sendable, Service {
170170
)
171171
}
172172

173-
// Events that would be triggered by ``RDKafkaClient/poll(timeout:)``
174-
// are now triggered by ``RDKafkaClient/consumerPoll``.
173+
// Forward main queue events to the consumer queue.
175174
try client.pollSetConsumer()
176175

177176
switch configuration.consumptionStrategy._internal {

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,37 +19,54 @@ import Logging
1919
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
2020
/// which is used to handle the connection to the Kafka ecosystem.
2121
final class RDKafkaClient: Sendable {
22+
// Default size for Strings returned from C API
23+
static let stringSize = 1024
24+
2225
/// Determines if client is a producer or a consumer.
2326
enum ClientType {
2427
case producer
2528
case consumer
2629
}
2730

28-
// Default size for Strings returned from C API
29-
static let stringSize = 1024
30-
3131
/// Handle for the C library's Kafka instance.
3232
private let kafkaHandle: OpaquePointer
3333
/// A logger.
3434
private let logger: Logger
3535

36-
/// `librdkafka`'s main `rd_kafka_queue_t`.
37-
private let mainQueue: OpaquePointer
36+
/// `librdkafka`'s `rd_kafka_queue_t` that events are received on.
37+
private let queue: OpaquePointer
3838

3939
// Use factory method to initialize
4040
private init(
41+
type: ClientType,
4142
kafkaHandle: OpaquePointer,
4243
logger: Logger
4344
) {
4445
self.kafkaHandle = kafkaHandle
4546
self.logger = logger
4647

47-
self.mainQueue = rd_kafka_queue_get_main(self.kafkaHandle)
48+
if type == .consumer {
49+
if let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle) {
50+
// (Important)
51+
// Polling the queue counts as a consumer poll, and will reset the timer for `max.poll.interval.ms`.
52+
self.queue = consumerQueue
53+
} else {
54+
fatalError("""
55+
Internal error: failed to get consumer queue. \
56+
A group.id should be set even when the client is not part of a consumer group. \
57+
See https://github.com/edenhill/librdkafka/issues/3261 for more information.
58+
""")
59+
}
60+
} else {
61+
self.queue = rd_kafka_queue_get_main(self.kafkaHandle)
62+
}
4863

49-
rd_kafka_set_log_queue(self.kafkaHandle, self.mainQueue)
64+
rd_kafka_set_log_queue(self.kafkaHandle, self.queue)
5065
}
5166

5267
deinit {
68+
// Loose reference to librdkafka's event queue
69+
rd_kafka_queue_destroy(self.queue)
5370
rd_kafka_destroy(kafkaHandle)
5471
}
5572

@@ -60,8 +77,6 @@ final class RDKafkaClient: Sendable {
6077
events: [RDKafkaEvent],
6178
logger: Logger
6279
) throws -> RDKafkaClient {
63-
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
64-
6580
let rdConfig = try RDKafkaConfig.createFrom(configDictionary: configDictionary)
6681
// Manually override some of the configuration options
6782
// Handle logs in event queue
@@ -75,6 +90,7 @@ final class RDKafkaClient: Sendable {
7590
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: RDKafkaClient.stringSize)
7691
defer { errorChars.deallocate() }
7792

93+
let clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
7894
guard let handle = rd_kafka_new(
7995
clientType,
8096
rdConfig,
@@ -88,7 +104,7 @@ final class RDKafkaClient: Sendable {
88104
throw KafkaError.client(reason: errorString)
89105
}
90106

91-
return RDKafkaClient(kafkaHandle: handle, logger: logger)
107+
return RDKafkaClient(type: type, kafkaHandle: handle, logger: logger)
92108
}
93109

94110
/// Produce a message to the Kafka cluster.
@@ -163,7 +179,7 @@ final class RDKafkaClient: Sendable {
163179
events.reserveCapacity(maxEvents)
164180

165181
for _ in 0..<maxEvents {
166-
let event = rd_kafka_queue_poll(self.mainQueue, 0)
182+
let event = rd_kafka_queue_poll(self.queue, 0)
167183
defer { rd_kafka_event_destroy(event) }
168184

169185
let rdEventType = rd_kafka_event_type(event)
@@ -399,7 +415,7 @@ final class RDKafkaClient: Sendable {
399415
rd_kafka_commit_queue(
400416
self.kafkaHandle,
401417
listPointer,
402-
self.mainQueue,
418+
self.queue,
403419
nil,
404420
opaquePointer
405421
)
@@ -433,8 +449,7 @@ final class RDKafkaClient: Sendable {
433449
///
434450
/// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
435451
func consumerClose() throws {
436-
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
437-
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
452+
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, self.queue)
438453
let kafkaError = rd_kafka_error_code(result)
439454
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
440455
throw KafkaError.rdKafkaError(wrapping: kafkaError)

0 commit comments

Comments
 (0)