Skip to content

Commit b1d5cfa

Browse files
Add missing transition AsyncBackpressuredStream state machine (#27)
### Motivation Some of our bidirectional streaming tests were failing intermittently. When they failed, the symptom was that more bytes were received by the end user (in the test) than the server sent. For example, in the test `testStreamingDownload_1kChunk_10kChunks_100BDownloadWatermark`, we expect: - ✅ The server sends 10,000 chunks of 1024 bytes: ```console ❯ cat repro.txt | grep -i "server sent body chunk" | head -3 Server sent body chunk 1/10000 of 1024 Server sent body chunk 2/10000 of 1024 Server sent body chunk 3/10000 of 1024 ❯ cat repro.txt | grep -i "server sent body chunk" | wc -l 10000 ``` - ✅ URLSession `didReceive data` callback called a non-deterministic number of times because it may re-chunk the bytes internally, but the total number of bytes through the delegate calls is 10,240,000: ```console ❯ cat repro.txt | grep "didReceive data" | head -3 Task delegate: didReceive data (numBytes: 1024) Task delegate: didReceive data (numBytes: 2048) Task delegate: didReceive data (numBytes: 1024) ❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | paste -sd+ | bc -l 10240000 ``` - ❌ The response body chunks emitted by the `AsyncBackpressuredStream` to the user (in the test) match 1:1 those received by the delegate callback: ```console ❯ cat repro.txt | grep "Client received some response body bytes" | head -3 Client received some response body bytes (numBytes: 1024) Client received some response body bytes (numBytes: 2048) Client received some response body bytes (numBytes: 1024) ❯ cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \) | wc -l 333 ❯ cat repro.txt | grep "Client received some response body bytes" | wc -l 334 ``` - ❌ The total number of bytes emitted by the `AsyncBackpressuredStream` to the user (in the test) match is 10,240,000 and it can then reconstruct 10,000 chunks of 1024 to match what the server sent: ```console ❯ cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \) | paste -sd+ | bc -l 10280960 ❯ cat repro.txt | grep "Client reconstructing" | tail -3 Client reconstructing and verifying chunk 10038/10000 Client reconstructing and verifying chunk 10039/10000 Client reconstructing and verifying chunk 10040/10000 ``` So we see that there was one more element emitted from the `AsyncBackpressuredStream` than the delegate callback wrote, which meant the test saw an additional 40960 bytes than it expected to and consequently reconstructed an additional 40 chunks of size 1024 over what the server sent. We can see that the `AsyncBackpressuredStream` duplicates an element along the way, of 40960 bytes, ```diff ❯ diff -u --label=received-in-delegate-callback <(cat repro.txt | grep "didReceive data" | awk '{ print $6 }' | tr -d \)) --label=received-through-async-sequence <(cat repro.txt | grep "Client received some response body bytes" | awk '{ print $8 }' | tr -d \)) --- received-in-delegate-callback +++ received-through-async-sequence @@ -305,6 +305,7 @@ 2048 1024 40960 +40960 24576 2048 34841 ``` After some investigation, it turned out there was a missing transition in the state machine that underlies the `AsyncBackpressuredStream`. When calling `suspendNext` when there are buffered elements, but we are above the watermark, we popped the first item from the buffer and returned it _without_ updating the state (with the new buffer, without the popped element). Consequently, this meant that the _next_ call to `next()` would return the same element again. ### Modifications The following modifications have been made in separate commits to aid review: - Add debug logging to the state machine functions, logging the old state, event, new state, and resulting action. - Add two tests which reproduce this error. - Add the missing state transition (which causes the newly added tests to reliably pass). ### Result Duplicate elements are no longer emitted from the response body. ### Test Plan - Unit tests were added that fail without the fix, that now pass reliably. ### Additional notes The implementation we are using for `AsyncBackpressuredStream` was taken from an early draft of SE-0406. We should probably move to using something closer matching that of the current PR to the Swift tree, or that used by swift-grpc, which has also adopted this code and cleaned it up to remove the dependencies on the standard library internals. Additionally, that implementation does not have this missing state transition and also adds an intermediate state to the state machine to avoid unintended copy-on-write.
1 parent 8464a53 commit b1d5cfa

File tree

2 files changed

+215
-4
lines changed

2 files changed

+215
-4
lines changed

Sources/OpenAPIURLSession/AsyncBackpressuredStream/AsyncBackpressuredStream.swift

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,20 @@ extension AsyncBackpressuredStream {
346346
}
347347

348348
func write<S: Sequence>(contentsOf sequence: S) throws -> Source.WriteResult where S.Element == Element {
349-
let action = self.lock.withLock { return self.stateMachine.write(sequence) }
349+
let action = self.lock.withLock {
350+
let stateBefore = self.stateMachine.state
351+
let action = self.stateMachine.write(sequence)
352+
let stateAfter = self.stateMachine.state
353+
debug("""
354+
---
355+
event: write
356+
state before: \(stateBefore)
357+
state after: \(stateAfter)
358+
action: \(action)
359+
---
360+
""")
361+
return action
362+
}
350363

351364
switch action {
352365
case .returnProduceMore: return .produceMore
@@ -385,7 +398,18 @@ extension AsyncBackpressuredStream {
385398
onProduceMore: @escaping @Sendable (Result<Void, any Error>) -> Void
386399
) {
387400
let action = self.lock.withLock {
388-
return self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
401+
let stateBefore = self.stateMachine.state
402+
let action = self.stateMachine.enqueueProducer(writeToken: writeToken, onProduceMore: onProduceMore)
403+
let stateAfter = self.stateMachine.state
404+
debug("""
405+
---
406+
event: \(#function)
407+
state before: \(stateBefore)
408+
state after: \(stateAfter)
409+
action: \(action)
410+
---
411+
""")
412+
return action
389413
}
390414

391415
switch action {
@@ -449,7 +473,20 @@ extension AsyncBackpressuredStream {
449473
}
450474

451475
func next() async throws -> Element? {
452-
let action = self.lock.withLock { return self.stateMachine.next() }
476+
let action = self.lock.withLock {
477+
let stateBefore = self.stateMachine.state
478+
let action = self.stateMachine.next()
479+
let stateAfter = self.stateMachine.state
480+
debug("""
481+
---
482+
event: next
483+
state before: \(stateBefore)
484+
state after: \(stateAfter)
485+
action: \(action)
486+
---
487+
""")
488+
return action
489+
}
453490

454491
switch action {
455492
case .returnElement(let element): return element
@@ -476,7 +513,20 @@ extension AsyncBackpressuredStream {
476513
func suspendNext() async throws -> Element? {
477514
return try await withTaskCancellationHandler {
478515
return try await withCheckedThrowingContinuation { continuation in
479-
let action = self.lock.withLock { return self.stateMachine.suspendNext(continuation: continuation) }
516+
let action = self.lock.withLock {
517+
let stateBefore = self.stateMachine.state
518+
let action = self.stateMachine.suspendNext(continuation: continuation)
519+
let stateAfter = self.stateMachine.state
520+
debug("""
521+
---
522+
event: \(#function)
523+
state before: \(stateBefore)
524+
state after: \(stateAfter)
525+
action: \(action)
526+
---
527+
""")
528+
return action
529+
}
480530

481531
switch action {
482532
case .resumeContinuationWithElement(let continuation, let element):
@@ -1239,6 +1289,16 @@ extension AsyncBackpressuredStream {
12391289

12401290
guard shouldProduceMore else {
12411291
// We don't have any new demand, so we can just return the element.
1292+
self.state = .streaming(
1293+
backPressureStrategy: backPressureStrategy,
1294+
buffer: buffer,
1295+
consumerContinuation: nil,
1296+
producerContinuations: producerContinuations,
1297+
cancelledAsyncProducers: cancelledAsyncProducers,
1298+
hasOutstandingDemand: hasOutstandingDemand,
1299+
iteratorInitialized: iteratorInitialized,
1300+
onTerminate: onTerminate
1301+
)
12421302
return .resumeContinuationWithElement(continuation, element)
12431303
}
12441304
let producers = Array(producerContinuations.map { $0.1 })

Tests/OpenAPIURLSessionTests/AsyncBackpressuredStreamTests/AsyncBackpressuredStreamTests.swift

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,120 @@ final class AsyncBackpressuredStreamTests: XCTestCase {
198198
XCTAssertEqual(strategy.didConsume(elements: Slice([])), true)
199199
XCTAssertEqual(strategy.currentWatermark, 0)
200200
}
201+
202+
func testWritingOverWatermark() async throws {
203+
try await withThrowingTaskGroup(of: Void.self) { group in
204+
let (stream, continuation) = AsyncBackpressuredStream<Int, any Error>
205+
.makeStream(backPressureStrategy: .highLowWatermark(lowWatermark: 1, highWatermark: 1))
206+
207+
group.addTask {
208+
for i in 1...10 {
209+
debug("Producer writing element \(i)...")
210+
let writeResult = try continuation.write(contentsOf: CollectionOfOne(i))
211+
debug("Producer wrote element \(i), result = \(writeResult)")
212+
// ignore backpressure result and write again anyway
213+
}
214+
debug("Producer finished")
215+
continuation.finish(throwing: nil)
216+
}
217+
218+
var iterator = stream.makeAsyncIterator()
219+
var numElementsConsumed = 0
220+
var expectedNextValue = 1
221+
while true {
222+
debug("Consumer reading element...")
223+
guard let element = try await iterator.next() else { break }
224+
XCTAssertEqual(element, expectedNextValue)
225+
debug("Consumer read element: \(element), expected: \(expectedNextValue)")
226+
numElementsConsumed += 1
227+
expectedNextValue += 1
228+
}
229+
XCTAssertEqual(numElementsConsumed, 10)
230+
231+
group.cancelAll()
232+
}
233+
}
234+
235+
func testStateMachineSuspendNext() async throws {
236+
typealias Stream = AsyncBackpressuredStream<Int, any Error>
237+
238+
var strategy = Stream.InternalBackPressureStrategy.highLowWatermark(.init(lowWatermark: 1, highWatermark: 1))
239+
_ = strategy.didYield(elements: Slice([1, 2, 3]))
240+
var stateMachine = Stream.StateMachine(backPressureStrategy: strategy, onTerminate: nil)
241+
stateMachine.state = .streaming(
242+
backPressureStrategy: strategy,
243+
buffer: [1, 2, 3],
244+
consumerContinuation: nil,
245+
producerContinuations: [],
246+
cancelledAsyncProducers: [],
247+
hasOutstandingDemand: false,
248+
iteratorInitialized: true,
249+
onTerminate: nil
250+
)
251+
252+
guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
253+
XCTFail("Unexpected state: \(stateMachine.state)")
254+
return
255+
}
256+
XCTAssertEqual(buffer, [1, 2, 3])
257+
XCTAssertNil(consumerContinuation)
258+
259+
_ = try await withCheckedThrowingContinuation { continuation in
260+
let action = stateMachine.suspendNext(continuation: continuation)
261+
262+
guard case .resumeContinuationWithElement(_, let element) = action else {
263+
XCTFail("Unexpected action: \(action)")
264+
return
265+
}
266+
XCTAssertEqual(element, 1)
267+
268+
guard case .streaming(_, let buffer, let consumerContinuation, _, _, _, _, _) = stateMachine.state else {
269+
XCTFail("Unexpected state: \(stateMachine.state)")
270+
return
271+
}
272+
XCTAssertEqual(buffer, [2, 3])
273+
XCTAssertNil(consumerContinuation)
274+
275+
continuation.resume(returning: element)
276+
}
277+
}
278+
}
279+
280+
extension AsyncBackpressuredStream.Source.WriteResult: CustomStringConvertible {
281+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
282+
public var description: String {
283+
switch self {
284+
case .enqueueCallback: return "enqueueCallBack"
285+
case .produceMore: return "produceMore"
286+
}
287+
}
288+
}
289+
290+
extension AsyncBackpressuredStream.StateMachine.SuspendNextAction: CustomStringConvertible {
291+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
292+
public var description: String {
293+
switch self {
294+
case .none: return "none"
295+
case .resumeContinuationWithElement: return "resumeContinuationWithElement"
296+
case .resumeContinuationWithElementAndProducers: return "resumeContinuationWithElementAndProducers"
297+
case .resumeContinuationWithFailureAndCallOnTerminate: return "resumeContinuationWithFailureAndCallOnTerminate"
298+
case .resumeContinuationWithNil: return "resumeContinuationWithNil"
299+
}
300+
}
301+
}
302+
303+
extension AsyncBackpressuredStream.StateMachine.State: CustomStringConvertible {
304+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
305+
public var description: String {
306+
switch self {
307+
case .initial: return "initial"
308+
case .streaming(_, let buffer, let consumer, let producers, _, let demand, _, _):
309+
return
310+
"streaming(buffer.count: \(buffer.count), consumer: \(consumer != nil ? "yes" : "no"), producers: \(producers), demand: \(demand))"
311+
case .finished: return "finished"
312+
case .sourceFinished: return "sourceFinished"
313+
}
314+
}
201315
}
202316

203317
extension AsyncSequence {
@@ -206,3 +320,40 @@ extension AsyncSequence {
206320
try await self.reduce(into: []) { accumulated, next in accumulated.append(next) }
207321
}
208322
}
323+
324+
extension AsyncBackpressuredStream.StateMachine.NextAction: CustomStringConvertible {
325+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
326+
public var description: String {
327+
switch self {
328+
case .returnNil: return "returnNil"
329+
case .returnElementAndResumeProducers: return "returnElementAndResumeProducers"
330+
case .returnFailureAndCallOnTerminate: return "returnFailureAndCallOnTerminate"
331+
case .returnElement: return "returnElement"
332+
case .suspendTask: return "suspendTask"
333+
}
334+
}
335+
}
336+
337+
extension AsyncBackpressuredStream.StateMachine.WriteAction: CustomStringConvertible {
338+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
339+
public var description: String {
340+
switch self {
341+
case .returnProduceMore: return "returnProduceMore"
342+
case .returnEnqueue: return "returnEnqueue"
343+
case .resumeConsumerContinuationAndReturnProduceMore: return "resumeConsumerContinuationAndReturnProduceMore"
344+
case .resumeConsumerContinuationAndReturnEnqueue: return "resumeConsumerContinuationAndReturnEnqueue"
345+
case .throwFinishedError: return "throwFinishedError"
346+
}
347+
}
348+
}
349+
350+
extension AsyncBackpressuredStream.StateMachine.EnqueueProducerAction: CustomStringConvertible {
351+
// swift-format-ignore: AllPublicDeclarationsHaveDocumentation
352+
public var description: String {
353+
switch self {
354+
case .resumeProducer: return "resumeProducer"
355+
case .resumeProducerWithCancellationError: return "resumeProducerWithCancellationError"
356+
case .none: return "none"
357+
}
358+
}
359+
}

0 commit comments

Comments
 (0)