diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 6beba8938..384db82d8 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -541,6 +541,13 @@ internal final class HttpBinHandler: ChannelInboundHandler { } context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) return + case "/zeros/150000": + let buf = context.channel.allocator.buffer(repeating: 0, count: 150_000) + context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok))), promise: nil) + context.writeAndFlush(wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + return + default: context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .notFound))), promise: nil) context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 5fe7b2417..51da398a9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -119,6 +119,7 @@ extension HTTPClientTests { ("testDelegateCallinsTolerateRandomEL", testDelegateCallinsTolerateRandomEL), ("testContentLengthTooLongFails", testContentLengthTooLongFails), ("testContentLengthTooShortFails", testContentLengthTooShortFails), + ("testDownloadBackpressure", testDownloadBackpressure), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index df144b70b..8c0506c80 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -2533,4 +2533,61 @@ class HTTPClientTests: XCTestCase { XCTAssertEqual(info.connectionNumber, 1) XCTAssertEqual(info.requestNumber, 1) } + + func testDownloadBackpressure() { + class BackpressureResponseDelegate: HTTPClientResponseDelegate { + typealias Response = Void + var count = 0 + var totalCount = 0 + var processingBodyPart = false + var didntWait = false + var lock = Lock() + + init() {} + + func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { + return task.eventLoop.makeSucceededFuture(()) + } + + func didReceiveBodyPart(task: HTTPClient.Task, _ part: ByteBuffer) -> EventLoopFuture { + self.lock.withLock { + // if processingBodyPart is true then previous body part is still being processed + // XCTAssertEqual doesn't work here so store result to test later + if processingBodyPart == true { + didntWait = true + } + processingBodyPart = true + count += 1 + totalCount += 1 + } + // wait one second before returning a successful future + return task.eventLoop.scheduleTask(in: .milliseconds(200)) { + self.lock.withLock { + self.processingBodyPart = false + self.count -= 1 + } + }.futureResult + } + + func didReceiveError(task: HTTPClient.Task, _ error: Error) {} + func didFinishRequest(task: HTTPClient.Task) throws {} + } + + let elg = MultiThreadedEventLoopGroup(numberOfThreads: 3) + let client = HTTPClient(eventLoopGroupProvider: .shared(elg)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + XCTAssertNoThrow(try elg.syncShutdownGracefully()) + } + + let backpressureResponseDelegate = BackpressureResponseDelegate() + guard let request = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "zeros/150000") else { + XCTFail("Failed to init Request") + return + } + XCTAssertNoThrow(try client.execute(request: request, delegate: backpressureResponseDelegate).wait()) + XCTAssertEqual(backpressureResponseDelegate.didntWait, false) + XCTAssertGreaterThan(backpressureResponseDelegate.totalCount, 2) + XCTAssertEqual(backpressureResponseDelegate.count, 0) + } }