Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TestBackpressure test #273

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,13 @@ internal final class HttpBinHandler: ChannelInboundHandler {
}
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return
case "/zeros/150000":
let buf = context.channel.allocator.buffer(repeating: 0, count: 150_000)
context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .ok))), promise: nil)
context.writeAndFlush(wrapOutboundOut(.body(.byteBuffer(buf))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
return

default:
context.write(wrapOutboundOut(.head(HTTPResponseHead(version: HTTPVersion(major: 1, minor: 1), status: .notFound))), promise: nil)
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
Expand Down
1 change: 1 addition & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ extension HTTPClientTests {
("testDelegateCallinsTolerateRandomEL", testDelegateCallinsTolerateRandomEL),
("testContentLengthTooLongFails", testContentLengthTooLongFails),
("testContentLengthTooShortFails", testContentLengthTooShortFails),
("testDownloadBackpressure", testDownloadBackpressure),
]
}
}
57 changes: 57 additions & 0 deletions Tests/AsyncHTTPClientTests/HTTPClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2533,4 +2533,61 @@ class HTTPClientTests: XCTestCase {
XCTAssertEqual(info.connectionNumber, 1)
XCTAssertEqual(info.requestNumber, 1)
}

func testDownloadBackpressure() {
class BackpressureResponseDelegate: HTTPClientResponseDelegate {
typealias Response = Void
var count = 0
var totalCount = 0
var processingBodyPart = false
var didntWait = false
var lock = Lock()

init() {}

func didReceiveHead(task: HTTPClient.Task<Response>, _ head: HTTPResponseHead) -> EventLoopFuture<Void> {
return task.eventLoop.makeSucceededFuture(())
}

func didReceiveBodyPart(task: HTTPClient.Task<Response>, _ part: ByteBuffer) -> EventLoopFuture<Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we making sure that didReceiveBodyPart is actually invoked multiple times? Because if it's only invoked once, then we don't know if download streaming works or not I think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren’t at the moment. I can add a test to see if it did. Is there a way we can force it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adam-fowler From the NIO level: Yes, you could set ChannelOptions.maxMessagesPerRead to 1 and set the allocator to FixedSizeRecvAllocator(1) which means that NIO will read everything byte to byte. That'll cause AHC to call out multiple times.

The issue here is that through AHC's API, you can't easily mess with the underlying Channel... You could make it an internal test and force it that way.

The other (probably easier) option is:

  • Download say 1 MB of data. NIO's default allocator will never send you more than 64k. Given that AHC doesn't change it, you can't get 1 MB in one big chunk.
  • Add XCTAssertGreaterOrEqual(numberOfCalls, 2) or so.

self.lock.withLock {
// if processingBodyPart is true then previous body part is still being processed
// XCTAssertEqual doesn't work here so store result to test later
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest: What about XCTAssertEqual doesn't work here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

XCTAssertEqual doesn’t return an error while assert does not totally sure why

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adam-fowler yes, it continues the execution but it will still fail the test run once it's complete. So I think you can just use XCTAssert...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this situation it doesn't work.

if processingBodyPart == true {
didntWait = true
}
processingBodyPart = true
count += 1
totalCount += 1
}
// wait one second before returning a successful future
return task.eventLoop.scheduleTask(in: .milliseconds(200)) {
self.lock.withLock {
self.processingBodyPart = false
self.count -= 1
}
}.futureResult
}

func didReceiveError(task: HTTPClient.Task<Response>, _ error: Error) {}
func didFinishRequest(task: HTTPClient.Task<Response>) throws {}
}

let elg = MultiThreadedEventLoopGroup(numberOfThreads: 3)
let client = HTTPClient(eventLoopGroupProvider: .shared(elg))
defer {
XCTAssertNoThrow(try client.syncShutdown())
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

let backpressureResponseDelegate = BackpressureResponseDelegate()
guard let request = try? HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "zeros/150000") else {
XCTFail("Failed to init Request")
return
}
XCTAssertNoThrow(try client.execute(request: request, delegate: backpressureResponseDelegate).wait())
XCTAssertEqual(backpressureResponseDelegate.didntWait, false)
XCTAssertGreaterThan(backpressureResponseDelegate.totalCount, 2)
XCTAssertEqual(backpressureResponseDelegate.count, 0)
}
}