Skip to content

Commit e4fded7

Browse files
authored
Better backpressure management. (#352)
Motivation: Users of the HTTPClientResponseDelegate expect that the event loop futures returned from didReceiveHead and didReceiveBodyPart can be used to exert backpressure. To be fair to them, they somewhat can. However, the TaskHandler has a bit of a misunderstanding about how NIO backpressure works, and does not correctly manage the buffer of inbound data. The result of this misunderstanding is that multiple calls to didReceiveBodyPart and didReceiveHead can be outstanding at once. This would likely lead to severe bugs in most delegates, as they do not expect it. We should make things work the way delegate implementers believe it works. Modifications: - Added a buffer to the TaskHandler to avoid delivering data that the delegate is not ready for. - Added a new "pending close" state that keeps track of a state where the TaskHandler has received .end but not yet delivered it to the delegate. This allows better error management. - Added some more tests. - Documented our backpressure commitments. Result: Better respect for backpressure. Resolves #348
1 parent b075d19 commit e4fded7

File tree

6 files changed

+281
-29
lines changed

6 files changed

+281
-29
lines changed

Sources/AsyncHTTPClient/HTTPHandler.swift

+104-28
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,26 @@ public class ResponseAccumulator: HTTPClientResponseDelegate {
419419
/// `HTTPClientResponseDelegate` allows an implementation to receive notifications about request processing and to control how response parts are processed.
420420
/// You can implement this protocol if you need fine-grained control over an HTTP request/response, for example, if you want to inspect the response
421421
/// headers before deciding whether to accept a response body, or if you want to stream your request body. Pass an instance of your conforming
422-
/// class to the `HTTPClient.execute()` method and this package will call each delegate method appropriately as the request takes place.
422+
/// class to the `HTTPClient.execute()` method and this package will call each delegate method appropriately as the request takes place./
423+
///
424+
/// ### Backpressure
425+
///
426+
/// A `HTTPClientResponseDelegate` can be used to exert backpressure on the server response. This is achieved by way of the futures returned from
427+
/// `didReceiveHead` and `didReceiveBodyPart`. The following functions are part of the "backpressure system" in the delegate:
428+
///
429+
/// - `didReceiveHead`
430+
/// - `didReceiveBodyPart`
431+
/// - `didFinishRequest`
432+
/// - `didReceiveError`
433+
///
434+
/// The first three methods are strictly _exclusive_, with that exclusivity managed by the futures returned by `didReceiveHead` and
435+
/// `didReceiveBodyPart`. What this means is that until the returned future is completed, none of these three methods will be called
436+
/// again. This allows delegates to rate limit the server to a capacity it can manage. `didFinishRequest` does not return a future,
437+
/// as we are expecting no more data from the server at this time.
438+
///
439+
/// `didReceiveError` is somewhat special: it signals the end of this regime. `didRecieveError` is not exclusive: it may be called at
440+
/// any time, even if a returned future is not yet completed. `didReceiveError` is terminal, meaning that once it has been called none
441+
/// of these four methods will be called again. This can be used as a signal to abandon all outstanding work.
423442
///
424443
/// - note: This delegate is strongly held by the `HTTPTaskHandler`
425444
/// for the duration of the `Request` processing and will be
@@ -463,6 +482,11 @@ public protocol HTTPClientResponseDelegate: AnyObject {
463482
/// You must return an `EventLoopFuture<Void>` that you complete when you have finished processing the body part.
464483
/// You can create an already succeeded future by calling `task.eventLoop.makeSucceededFuture(())`.
465484
///
485+
/// This function will not be called until the future returned by `didReceiveHead` has completed.
486+
///
487+
/// This function will not be called for subsequent body parts until the previous future returned by a
488+
/// call to this function completes.
489+
///
466490
/// - parameters:
467491
/// - task: Current request context.
468492
/// - buffer: Received body `Part`.
@@ -471,13 +495,20 @@ public protocol HTTPClientResponseDelegate: AnyObject {
471495

472496
/// Called when error was thrown during request execution. Will be called zero or one time only. Request processing will be stopped after that.
473497
///
498+
/// This function may be called at any time: it does not respect the backpressure exerted by `didReceiveHead` and `didReceiveBodyPart`.
499+
/// All outstanding work may be cancelled when this is received. Once called, no further calls will be made to `didReceiveHead`, `didReceiveBodyPart`,
500+
/// or `didFinishRequest`.
501+
///
474502
/// - parameters:
475503
/// - task: Current request context.
476504
/// - error: Error that occured during response processing.
477505
func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error)
478506

479507
/// Called when the complete HTTP request is finished. You must return an instance of your `Response` associated type. Will be called once, except if an error occurred.
480508
///
509+
/// This function will not be called until all futures returned by `didReceiveHead` and `didReceiveBodyPart` have completed. Once called,
510+
/// no further calls will be made to `didReceiveHead`, `didReceiveBodyPart`, or `didReceiveError`.
511+
///
481512
/// - parameters:
482513
/// - task: Current request context.
483514
/// - returns: Result of processing.
@@ -678,6 +709,7 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
678709
case bodySentWaitingResponseHead
679710
case bodySentResponseHeadReceived
680711
case redirected(HTTPResponseHead, URL)
712+
case bufferedEnd
681713
case endOrError
682714
}
683715

@@ -688,10 +720,11 @@ internal class TaskHandler<Delegate: HTTPClientResponseDelegate>: RemovableChann
688720
let logger: Logger // We are okay to store the logger here because a TaskHandler is just for one request.
689721

690722
var state: State = .idle
723+
var responseReadBuffer: ResponseReadBuffer = ResponseReadBuffer()
691724
var expectedBodyLength: Int?
692725
var actualBodyLength: Int = 0
693726
var pendingRead = false
694-
var mayRead = true
727+
var outstandingDelegateRead = false
695728
var closing = false {
696729
didSet {
697730
assert(self.closing || !oldValue,
@@ -861,10 +894,12 @@ extension TaskHandler: ChannelDuplexHandler {
861894
preconditionFailure("should not happen, state is \(self.state)")
862895
case .redirected:
863896
break
864-
case .endOrError:
897+
case .bufferedEnd, .endOrError:
865898
// If the state is .endOrError, it means that request was failed and there is nothing to do here:
866899
// we cannot write .end since channel is most likely closed, and we should not fail the future,
867900
// since the task would already be failed, no need to fail the writer too.
901+
// If the state is .bufferedEnd the issue is the same, we just haven't fully delivered the response to
902+
// the user yet.
868903
return context.eventLoop.makeSucceededFuture(())
869904
}
870905

@@ -937,19 +972,19 @@ extension TaskHandler: ChannelDuplexHandler {
937972
}
938973
self.actualBodyLength += part.readableBytes
939974
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
940-
case .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .endOrError:
975+
case .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .bufferedEnd, .endOrError:
941976
let error = HTTPClientError.writeAfterRequestSent
942977
self.errorCaught(context: context, error: error)
943978
promise.fail(error)
944979
}
945980
}
946981

947982
public func read(context: ChannelHandlerContext) {
948-
if self.mayRead {
983+
if self.outstandingDelegateRead {
984+
self.pendingRead = true
985+
} else {
949986
self.pendingRead = false
950987
context.read()
951-
} else {
952-
self.pendingRead = true
953988
}
954989
}
955990

@@ -968,7 +1003,7 @@ extension TaskHandler: ChannelDuplexHandler {
9681003
case .sendingBodyResponseHeadReceived, .bodySentResponseHeadReceived, .redirected:
9691004
// should be prevented by NIO HTTP1 pipeline, aee testHTTPResponseDoubleHead
9701005
preconditionFailure("should not happen")
971-
case .endOrError:
1006+
case .bufferedEnd, .endOrError:
9721007
return
9731008
}
9741009

@@ -979,45 +1014,86 @@ extension TaskHandler: ChannelDuplexHandler {
9791014
if let redirectURL = self.redirectHandler?.redirectTarget(status: head.status, headers: head.headers) {
9801015
self.state = .redirected(head, redirectURL)
9811016
} else {
982-
self.mayRead = false
983-
self.callOutToDelegate(value: head, channelEventLoop: context.eventLoop, self.delegate.didReceiveHead)
984-
.whenComplete { result in
985-
self.handleBackpressureResult(context: context, result: result)
986-
}
1017+
self.handleReadForDelegate(response, context: context)
9871018
}
988-
case .body(let body):
1019+
case .body:
9891020
switch self.state {
990-
case .redirected, .endOrError:
1021+
case .redirected, .bufferedEnd, .endOrError:
9911022
break
9921023
default:
993-
self.mayRead = false
994-
self.callOutToDelegate(value: body, channelEventLoop: context.eventLoop, self.delegate.didReceiveBodyPart)
995-
.whenComplete { result in
996-
self.handleBackpressureResult(context: context, result: result)
997-
}
1024+
self.handleReadForDelegate(response, context: context)
9981025
}
9991026
case .end:
10001027
switch self.state {
1001-
case .endOrError:
1028+
case .bufferedEnd, .endOrError:
10021029
break
10031030
case .redirected(let head, let redirectURL):
10041031
self.state = .endOrError
10051032
self.task.releaseAssociatedConnection(delegateType: Delegate.self, closing: self.closing).whenSuccess {
10061033
self.redirectHandler?.redirect(status: head.status, to: redirectURL, promise: self.task.promise)
10071034
}
10081035
default:
1009-
self.state = .endOrError
1010-
self.callOutToDelegate(promise: self.task.promise, self.delegate.didFinishRequest)
1036+
self.state = .bufferedEnd
1037+
self.handleReadForDelegate(response, context: context)
1038+
}
1039+
}
1040+
}
1041+
1042+
private func handleReadForDelegate(_ read: HTTPClientResponsePart, context: ChannelHandlerContext) {
1043+
if self.outstandingDelegateRead {
1044+
self.responseReadBuffer.appendPart(read)
1045+
return
1046+
}
1047+
1048+
// No outstanding delegate read, so we can deliver this directly.
1049+
self.deliverReadToDelegate(read, context: context)
1050+
}
1051+
1052+
private func deliverReadToDelegate(_ read: HTTPClientResponsePart, context: ChannelHandlerContext) {
1053+
precondition(!self.outstandingDelegateRead)
1054+
self.outstandingDelegateRead = true
1055+
1056+
if case .endOrError = self.state {
1057+
// No further read delivery should occur, we already delivered an error.
1058+
return
1059+
}
1060+
1061+
switch read {
1062+
case .head(let head):
1063+
self.callOutToDelegate(value: head, channelEventLoop: context.eventLoop, self.delegate.didReceiveHead)
1064+
.whenComplete { result in
1065+
self.handleBackpressureResult(context: context, result: result)
1066+
}
1067+
case .body(let body):
1068+
self.callOutToDelegate(value: body, channelEventLoop: context.eventLoop, self.delegate.didReceiveBodyPart)
1069+
.whenComplete { result in
1070+
self.handleBackpressureResult(context: context, result: result)
1071+
}
1072+
case .end:
1073+
self.state = .endOrError
1074+
self.outstandingDelegateRead = false
1075+
1076+
if self.pendingRead {
1077+
// We must read here, as we will be removed from the channel shortly.
1078+
self.pendingRead = false
1079+
context.read()
10111080
}
1081+
1082+
self.callOutToDelegate(promise: self.task.promise, self.delegate.didFinishRequest)
10121083
}
10131084
}
10141085

10151086
private func handleBackpressureResult(context: ChannelHandlerContext, result: Result<Void, Error>) {
10161087
context.eventLoop.assertInEventLoop()
1088+
self.outstandingDelegateRead = false
1089+
10171090
switch result {
10181091
case .success:
1019-
self.mayRead = true
1020-
if self.pendingRead {
1092+
if let nextRead = self.responseReadBuffer.nextRead() {
1093+
// We can deliver this directly.
1094+
self.deliverReadToDelegate(nextRead, context: context)
1095+
} else if self.pendingRead {
1096+
self.pendingRead = false
10211097
context.read()
10221098
}
10231099
case .failure(let error):
@@ -1046,7 +1122,7 @@ extension TaskHandler: ChannelDuplexHandler {
10461122
switch self.state {
10471123
case .idle, .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .redirected:
10481124
self.errorCaught(context: context, error: HTTPClientError.remoteConnectionClosed)
1049-
case .endOrError:
1125+
case .bufferedEnd, .endOrError:
10501126
break
10511127
}
10521128
context.fireChannelInactive()
@@ -1056,7 +1132,7 @@ extension TaskHandler: ChannelDuplexHandler {
10561132
switch error {
10571133
case NIOSSLError.uncleanShutdown:
10581134
switch self.state {
1059-
case .endOrError:
1135+
case .bufferedEnd, .endOrError:
10601136
/// Some HTTP Servers can 'forget' to respond with CloseNotify when client is closing connection,
10611137
/// this could lead to incomplete SSL shutdown. But since request is already processed, we can ignore this error.
10621138
break
@@ -1070,7 +1146,7 @@ extension TaskHandler: ChannelDuplexHandler {
10701146
}
10711147
default:
10721148
switch self.state {
1073-
case .idle, .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .redirected:
1149+
case .idle, .sendingBodyWaitingResponseHead, .sendingBodyResponseHeadReceived, .bodySentWaitingResponseHead, .bodySentResponseHeadReceived, .redirected, .bufferedEnd:
10741150
self.state = .endOrError
10751151
self.failTaskAndNotifyDelegate(error: error, self.delegate.didReceiveError)
10761152
case .endOrError:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import NIO
16+
import NIOHTTP1
17+
18+
struct ResponseReadBuffer {
19+
private var responseParts: CircularBuffer<HTTPClientResponsePart>
20+
21+
init() {
22+
self.responseParts = CircularBuffer(initialCapacity: 16)
23+
}
24+
25+
mutating func appendPart(_ part: HTTPClientResponsePart) {
26+
self.responseParts.append(part)
27+
}
28+
29+
mutating func nextRead() -> HTTPClientResponsePart? {
30+
return self.responseParts.popFirst()
31+
}
32+
}

Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift

+75
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import NIOHTTP1
2121
import NIOHTTPCompression
2222
import NIOSSL
2323
import NIOTransportServices
24+
import XCTest
2425

2526
/// Are we testing NIO Transport services
2627
func isTestingNIOTS() -> Bool {
@@ -100,6 +101,52 @@ class CountingDelegate: HTTPClientResponseDelegate {
100101
}
101102
}
102103

104+
class DelayOnHeadDelegate: HTTPClientResponseDelegate {
105+
typealias Response = ByteBuffer
106+
107+
let promise: EventLoopPromise<Void>
108+
109+
private var data: ByteBuffer
110+
111+
private var mayReceiveData = false
112+
113+
private var expectError = false
114+
115+
init(promise: EventLoopPromise<Void>) {
116+
self.promise = promise
117+
self.data = ByteBuffer()
118+
119+
self.promise.futureResult.whenSuccess {
120+
self.mayReceiveData = true
121+
}
122+
self.promise.futureResult.whenFailure { (_: Error) in
123+
self.expectError = true
124+
}
125+
}
126+
127+
func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
128+
XCTAssertFalse(self.expectError)
129+
return self.promise.futureResult.hop(to: task.eventLoop)
130+
}
131+
132+
func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ buffer: ByteBuffer) -> EventLoopFuture<Void> {
133+
XCTAssertTrue(self.mayReceiveData)
134+
XCTAssertFalse(self.expectError)
135+
self.data.writeImmutableBuffer(buffer)
136+
return self.promise.futureResult.hop(to: task.eventLoop)
137+
}
138+
139+
func didFinishRequest(task: HTTPClient.Task<Response>) throws -> Response {
140+
XCTAssertTrue(self.mayReceiveData)
141+
XCTAssertFalse(self.expectError)
142+
return self.data
143+
}
144+
145+
func didReceiveError(task: HTTPClient.Task<ByteBuffer>, _ error: Error) {
146+
XCTAssertTrue(self.expectError)
147+
}
148+
}
149+
103150
internal final class RecordingHandler<Input, Output>: ChannelDuplexHandler {
104151
typealias InboundIn = Input
105152
typealias OutboundIn = Output
@@ -464,6 +511,21 @@ internal final class HttpBinHandler: ChannelInboundHandler {
464511
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
465512
}
466513

514+
func writeChunked(context: ChannelHandlerContext) {
515+
// This tests receiving chunks very fast: please do not insert delays here!
516+
let headers = HTTPHeaders([("Transfer-Encoding", "chunked")])
517+
518+
context.write(self.wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok, headers: headers))), promise: nil)
519+
for i in 0..<10 {
520+
let msg = "id: \(i)"
521+
var buf = context.channel.allocator.buffer(capacity: msg.count)
522+
buf.writeString(msg)
523+
context.write(wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil)
524+
}
525+
526+
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
527+
}
528+
467529
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
468530
self.isServingRequest = true
469531
switch self.unwrapInboundIn(data) {
@@ -579,6 +641,19 @@ internal final class HttpBinHandler: ChannelInboundHandler {
579641
return
580642
case "/events/10/content-length":
581643
self.writeEvents(context: context, isContentLengthRequired: true)
644+
case "/chunked":
645+
self.writeChunked(context: context)
646+
return
647+
case "/close-on-response":
648+
var headers = self.responseHeaders
649+
headers.replaceOrAdd(name: "connection", value: "close")
650+
651+
var builder = HTTPResponseBuilder(.http1_1, status: .ok, headers: headers)
652+
builder.body = ByteBuffer(string: "some body content")
653+
654+
// We're forcing this closed now.
655+
self.shouldClose = true
656+
self.resps.append(builder)
582657
default:
583658
context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .notFound))), promise: nil)
584659
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)

0 commit comments

Comments
 (0)