Skip to content

Commit 838ebb3

Browse files
KafkaConsumer: store offsets when .finishing (#121)
Motivation: This PR addresses issue #119. The `KafkaConsumer` should not `fatalError` when storing offsets in state `.finishing`. See #119 (comment) for more information why. Modifications: * terminate `KafkaConsumerMessages` sequences when in state `.finishing` or `.finished` instead of storing offsets
1 parent d86522b commit 838ebb3

File tree

2 files changed

+72
-2
lines changed

2 files changed

+72
-2
lines changed

Sources/Kafka/KafkaConsumer.swift

+8-2
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,11 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
9494
self.deallocateIterator()
9595
throw error
9696
}
97+
return element
98+
case .terminateConsumerSequence:
99+
self.deallocateIterator()
100+
return nil
97101
}
98-
return element
99102
}
100103

101104
private mutating func deallocateIterator() {
@@ -572,6 +575,9 @@ extension KafkaConsumer {
572575
/// Store the message offset with the given `client`.
573576
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
574577
case storeOffset(client: RDKafkaClient)
578+
/// The consumer is in the process of `.finishing` or even `.finished`.
579+
/// Stop yielding new elements and terminate the asynchronous sequence.
580+
case terminateConsumerSequence
575581
}
576582

577583
/// Get action to take when wanting to store a message offset (to be auto-committed by `librdkafka`).
@@ -586,7 +592,7 @@ extension KafkaConsumer {
586592
case .consuming(let client, _):
587593
return .storeOffset(client: client)
588594
case .finishing, .finished:
589-
fatalError("\(#function) invoked while still in state \(self.state)")
595+
return .terminateConsumerSequence
590596
}
591597
}
592598

Tests/IntegrationTests/KafkaTests.swift

+64
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,70 @@ final class KafkaTests: XCTestCase {
272272
}
273273
}
274274

275+
func testNoNewConsumerMessagesAfterGracefulShutdown() async throws {
276+
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 2)
277+
let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: self.producerConfig, logger: .kafkaTest)
278+
279+
let uniqueGroupID = UUID().uuidString
280+
281+
var consumerConfig = KafkaConsumerConfiguration(
282+
consumptionStrategy: .group(
283+
id: uniqueGroupID,
284+
topics: [self.uniqueTestTopic]
285+
),
286+
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
287+
)
288+
consumerConfig.autoOffsetReset = .beginning // Read topic from beginning
289+
consumerConfig.broker.addressFamily = .v4
290+
291+
let consumer = try KafkaConsumer(
292+
configuration: consumerConfig,
293+
logger: .kafkaTest
294+
)
295+
296+
let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer, consumer], logger: .kafkaTest)
297+
let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration)
298+
299+
try await withThrowingTaskGroup(of: Void.self) { group in
300+
// Run Task
301+
group.addTask {
302+
try await serviceGroup.run()
303+
}
304+
305+
// Producer Task
306+
group.addTask {
307+
try await Self.sendAndAcknowledgeMessages(
308+
producer: producer,
309+
events: acks,
310+
messages: testMessages
311+
)
312+
}
313+
314+
// Wait for Producer Task to complete
315+
try await group.next()
316+
317+
// Verify that we receive the first message
318+
var consumerIterator = consumer.messages.makeAsyncIterator()
319+
320+
let consumedMessage = try await consumerIterator.next()
321+
XCTAssertEqual(testMessages.first!.topic, consumedMessage!.topic)
322+
XCTAssertEqual(ByteBuffer(string: testMessages.first!.key!), consumedMessage!.key)
323+
XCTAssertEqual(ByteBuffer(string: testMessages.first!.value), consumedMessage!.value)
324+
325+
// Trigger a graceful shutdown
326+
await serviceGroup.triggerGracefulShutdown()
327+
328+
// Wait to ensure the KafkaConsumer's shutdown handler has
329+
// been invoked.
330+
try await Task.sleep(for: .seconds(2))
331+
332+
// We should not be able to read any new messages after the KafkaConsumer's
333+
// shutdown handler was invoked
334+
let stoppedConsumingMessage = try await consumerIterator.next()
335+
XCTAssertNil(stoppedConsumingMessage)
336+
}
337+
}
338+
275339
func testCommittedOffsetsAreCorrect() async throws {
276340
let testMessages = Self.createTestMessages(topic: self.uniqueTestTopic, count: 10)
277341
let firstConsumerOffset = testMessages.count / 2

0 commit comments

Comments
 (0)