From 6ed10e149621a6229b1b6137370640a24f945167 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=93mar=20Kjartan=20Yasin?= Date: Wed, 29 Nov 2023 12:50:41 -0800 Subject: [PATCH] Commit messages by topic, partition, offset Motivation: In some cases, for instances when messages are handed out to other processes, it is not reasonable to hold on to the whole Kafka message to be able to commit it at a later time. It consumes much less memory to hold only onto the details needed to commit the message. Modifications: Add new public methods that allow users to commit messages if they have a reference to the topic, partition, and offset instead of only the whole message. Result: Developers no longer need to keep the whole message around to commit it, only the topic, partition, and offset. --- Sources/Kafka/KafkaConsumer.swift | 54 +++++++++++++++++-- Sources/Kafka/RDKafka/RDKafkaClient.swift | 66 +++++++++++++---------- 2 files changed, 88 insertions(+), 32 deletions(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index c5ec5226..b5914096 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -492,6 +492,28 @@ public final class KafkaConsumer: Sendable, Service { /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. public func scheduleCommit(_ message: KafkaConsumerMessage) throws { + try scheduleCommit( + topic: message.topic, + partition: message.partition, + offset: message.offset) + } + + /// Mark all messages up to the passed message in the topic as read. + /// Schedules a commit and returns immediately. + /// Any errors encountered after scheduling the commit will be discarded. + /// + /// This method is only used for manual offset management. + /// + /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default). + /// + /// - Parameters: + /// - topic: Topic where the message that should be marked as read resides. + /// - partition: Partition where the message that should be marked as read resides. + /// - offset: Offset of the message that shall be marked as read. + /// - Throws: A ``KafkaError`` if committing failed. + public func scheduleCommit(topic: String, + partition: KafkaPartition, + offset: KafkaOffset) throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -500,8 +522,10 @@ public final class KafkaConsumer: Sendable, Service { guard self.configuration.isAutoCommitEnabled == false else { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - - try client.scheduleCommit(message) + try client.scheduleCommit( + topic: topic, + partition: partition, + offset: offset) } } @@ -521,6 +545,26 @@ public final class KafkaConsumer: Sendable, Service { /// - message: Last received message that shall be marked as read. /// - Throws: A ``KafkaError`` if committing failed. public func commit(_ message: KafkaConsumerMessage) async throws { + try await commit(topic: message.topic, + partition: message.partition, + offset: message.offset) + } + + /// Mark all messages up to the passed message in the topic as read. + /// Awaits until the commit succeeds or an error is encountered. + /// + /// This method is only used for manual offset management. + /// + /// - Warning: This method fails if the ``KafkaConsumerConfiguration/isAutoCommitEnabled`` configuration property is set to `true` (default). + /// + /// - Parameters: + /// - topic: Topic where the message that should be marked as read resides. + /// - partition: Partition where the message that should be marked as read resides. + /// - offset: Offset of the message that shall be marked as read. + /// - Throws: A ``KafkaError`` if committing failed. + public func commit(topic: String, + partition: KafkaPartition, + offset: KafkaOffset) async throws { let action = self.stateMachine.withLockedValue { $0.commit() } switch action { case .throwClosedError: @@ -530,10 +574,12 @@ public final class KafkaConsumer: Sendable, Service { throw KafkaError.config(reason: "Committing manually only works if isAutoCommitEnabled set to false") } - try await client.commit(message) + try await client.commit(topic: topic, + partition: partition, + offset: offset) } } - + /// This function is used to gracefully shut down a Kafka consumer client. /// /// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 014a3e8c..b9146286 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -526,41 +526,51 @@ final class RDKafkaClient: Sendable { } } - /// Non-blocking "fire-and-forget" commit of a `message`'s offset to Kafka. + /// Non-blocking "fire-and-forget" commit of a `topic`, `partition`, and `offset` to Kafka. /// Schedules a commit and returns immediately. /// Any errors encountered after scheduling the commit will be discarded. /// - /// - Parameter message: Last received message that shall be marked as read. + /// - Parameter topic: Topic to commit to + /// - Parameter partition: Partition to commit to + /// - Parameter offset: Offset to commit /// - Throws: A ``KafkaError`` if scheduling the commit failed. - func scheduleCommit(_ message: KafkaConsumerMessage) throws { - // The offset committed is always the offset of the next requested message. - // Thus, we increase the offset of the current message by one before committing it. - // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 - let changesList = RDKafkaTopicPartitionList() - changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) - ) - - let error = changesList.withListPointer { listPointer in - return rd_kafka_commit( - self.kafkaHandle, - listPointer, - 1 // async = true + func scheduleCommit( + topic: String, + partition: KafkaPartition, + offset: KafkaOffset) throws { + // The offset committed is always the offset of the next requested message. + // Thus, we increase the offset of the current message by one before committing it. + // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 + let changesList = RDKafkaTopicPartitionList() + changesList.setOffset( + topic: topic, + partition: partition, + offset: Int64(offset.rawValue + 1) ) - } - if error != RD_KAFKA_RESP_ERR_NO_ERROR { - throw KafkaError.rdKafkaError(wrapping: error) - } + let error = changesList.withListPointer { listPointer in + return rd_kafka_commit( + self.kafkaHandle, + listPointer, + 1 // async = true + ) + } + + if error != RD_KAFKA_RESP_ERR_NO_ERROR { + throw KafkaError.rdKafkaError(wrapping: error) + } } /// Non-blocking **awaitable** commit of a `message`'s offset to Kafka. /// - /// - Parameter message: Last received message that shall be marked as read. + /// - Parameter topic: Topic to commit to + /// - Parameter partition: Partition to commit to + /// - Parameter offset: Offset to commit /// - Throws: A ``KafkaError`` if the commit failed. - func commit(_ message: KafkaConsumerMessage) async throws { + func commit( + topic: String, + partition: KafkaPartition, + offset: KafkaOffset) async throws { // Declare captured closure outside of withCheckedContinuation. // We do that because do an unretained pass of the captured closure to // librdkafka which means we have to keep a reference to the closure @@ -577,9 +587,9 @@ final class RDKafkaClient: Sendable { // See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945 let changesList = RDKafkaTopicPartitionList() changesList.setOffset( - topic: message.topic, - partition: message.partition, - offset: Int64(message.offset.rawValue + 1) + topic: topic, + partition: partition, + offset: Int64(offset.rawValue + 1) ) // Unretained pass because the reference that librdkafka holds to capturedClosure @@ -597,7 +607,7 @@ final class RDKafkaClient: Sendable { } } } - + /// Flush any outstanding produce requests. /// /// - Parameters: