Skip to content

Commit c85a410

Browse files
storeMessageOffset: ignore state error (#129)
Motivation: Previously, we failed the entire `KafkaConsumer` if storing a message offset through `RDKafkaClient.storeMessageOffset` failed because the partition the offset should be committed to was unassigned (which can happen during rebalance). We should not fail the consumer when committing during rebalance. The worst thing that could happen here is that storing the offset fails and we re-read a message, which is fine since KafkaConsumers with automatic commits are designed for at-least-once processing: https://docs.confluent.io/platform/current/clients/consumer.html#offset-management Modifications: * `RDKafkaClient.storeMessageOffset`: don't throw when receiving error `RD_KAFKA_RESP_ERR__STATE`
1 parent 8592c61 commit c85a410

File tree

1 file changed

+8
-0
lines changed

1 file changed

+8
-0
lines changed

Sources/Kafka/RDKafka/RDKafkaClient.swift

+8
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,14 @@ final class RDKafkaClient: Sendable {
520520
}
521521

522522
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
523+
// Ignore RD_KAFKA_RESP_ERR__STATE error.
524+
// RD_KAFKA_RESP_ERR__STATE indicates an attempt to commit to an unassigned partition,
525+
// which can occur during rebalancing or when the consumer is shutting down.
526+
// See "Upgrade considerations" for more details: https://github.com/confluentinc/librdkafka/releases/tag/v1.9.0
527+
// Since Kafka Consumers are designed for at-least-once processing, failing to commit here is acceptable.
528+
if error != RD_KAFKA_RESP_ERR__STATE {
529+
return
530+
}
523531
throw KafkaError.rdKafkaError(wrapping: error)
524532
}
525533
}

0 commit comments

Comments
 (0)