Skip to content

Commit 1608c4a

Browse files
KafkaConsumer: change locality of message error (#122)
Motivation: If the `KafkaConsumer` receives an errorneous message, it should not throw in the `func run()` method but rather in the `KafkaConsumer.messages` `ThrowingSequence`. Modifications: * make `KafkaConsumerMessages` `AsyncSequence` wrap a sequence that yields a `Result<>` type -> `KafkaConsumerMessages` unwraps this result type and either yields or throws
1 parent 838ebb3 commit 1608c4a

File tree

1 file changed

+22
-22
lines changed

1 file changed

+22
-22
lines changed

Sources/Kafka/KafkaConsumer.swift

+22-22
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
6767
public typealias Element = KafkaConsumerMessage
6868
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
6969
typealias WrappedSequence = NIOThrowingAsyncSequenceProducer<
70-
Element,
70+
Result<KafkaConsumerMessage, Error>,
7171
Error,
7272
BackPressureStrategy,
7373
KafkaConsumerCloseOnTerminate
@@ -80,24 +80,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
8080
var wrappedIterator: WrappedSequence.AsyncIterator?
8181

8282
public mutating func next() async throws -> Element? {
83-
guard let element = try await self.wrappedIterator?.next() else {
83+
guard let result = try await self.wrappedIterator?.next() else {
8484
self.deallocateIterator()
8585
return nil
8686
}
8787

88-
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
89-
switch action {
90-
case .storeOffset(let client):
91-
do {
92-
try client.storeMessageOffset(element)
93-
} catch {
88+
switch result {
89+
case .success(let message):
90+
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
91+
switch action {
92+
case .storeOffset(let client):
93+
do {
94+
try client.storeMessageOffset(message)
95+
} catch {
96+
self.deallocateIterator()
97+
throw error
98+
}
99+
return message
100+
case .terminateConsumerSequence:
94101
self.deallocateIterator()
95-
throw error
102+
return nil
96103
}
97-
return element
98-
case .terminateConsumerSequence:
104+
case .failure(let error):
99105
self.deallocateIterator()
100-
return nil
106+
throw error
101107
}
102108
}
103109

@@ -119,7 +125,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
119125
/// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster.
120126
public final class KafkaConsumer: Sendable, Service {
121127
typealias Producer = NIOThrowingAsyncSequenceProducer<
122-
KafkaConsumerMessage,
128+
Result<KafkaConsumerMessage, Error>,
123129
Error,
124130
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
125131
KafkaConsumerCloseOnTerminate
@@ -156,7 +162,7 @@ public final class KafkaConsumer: Sendable, Service {
156162
self.logger = logger
157163

158164
let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
159-
elementType: KafkaConsumerMessage.self,
165+
elementType: Result<KafkaConsumerMessage, Error>.self,
160166
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
161167
delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine)
162168
)
@@ -333,14 +339,8 @@ public final class KafkaConsumer: Sendable, Service {
333339
for event in events {
334340
switch event {
335341
case .consumerMessages(let result):
336-
switch result {
337-
case .success(let message):
338-
// We do not support back pressure, we can ignore the yield result
339-
_ = source.yield(message)
340-
case .failure(let error):
341-
source.finish()
342-
throw error
343-
}
342+
// We do not support back pressure, we can ignore the yield result
343+
_ = source.yield(result)
344344
default:
345345
break // Ignore
346346
}

0 commit comments

Comments
 (0)