@@ -2533,4 +2533,59 @@ class HTTPClientTests: XCTestCase {
2533
2533
XCTAssertEqual ( info. connectionNumber, 1 )
2534
2534
XCTAssertEqual ( info. requestNumber, 1 )
2535
2535
}
2536
+
2537
+ func testBackpressue( ) {
2538
+ class BackpressureResponseDelegate : HTTPClientResponseDelegate {
2539
+ typealias Response = Void
2540
+ var count = 0
2541
+ var processingBodyPart = false
2542
+ var didntWait = false
2543
+ var lock = Lock ( )
2544
+
2545
+ init ( ) { }
2546
+
2547
+ func didReceiveHead( task: HTTPClient . Task < Response > , _ head: HTTPResponseHead ) -> EventLoopFuture < Void > {
2548
+ return task. eventLoop. makeSucceededFuture ( ( ) )
2549
+ }
2550
+
2551
+ func didReceiveBodyPart( task: HTTPClient . Task < Response > , _ part: ByteBuffer ) -> EventLoopFuture < Void > {
2552
+ lock. withLock {
2553
+ // if processingBodyPart is true then previous body part is still being processed
2554
+ // XCTAssertEqual doesn't work here so store result to test later
2555
+ if processingBodyPart == true {
2556
+ didntWait = true
2557
+ }
2558
+ processingBodyPart = true
2559
+ count += 1
2560
+ }
2561
+ // wait one second before returning a successful future
2562
+ return task. eventLoop. scheduleTask ( in: . milliseconds( 1000 ) ) {
2563
+ self . lock. withLock {
2564
+ self . processingBodyPart = false
2565
+ self . count -= 1
2566
+ }
2567
+ } . futureResult
2568
+ }
2569
+
2570
+ func didReceiveError( task: HTTPClient . Task < Response > , _ error: Error ) { }
2571
+ func didFinishRequest( task: HTTPClient . Task < Response > ) throws { }
2572
+ }
2573
+
2574
+ let elg = MultiThreadedEventLoopGroup ( numberOfThreads: 5 )
2575
+ let client = HTTPClient ( eventLoopGroupProvider: . shared( elg) )
2576
+ defer {
2577
+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
2578
+ XCTAssertNoThrow ( try elg. syncShutdownGracefully ( ) )
2579
+ }
2580
+
2581
+ let data = Data ( count: 65273 )
2582
+ let backpressureResponseDelegate = BackpressureResponseDelegate ( )
2583
+ guard let request = try ? HTTPClient . Request ( url: self . defaultHTTPBinURLPrefix + " get " , body: . data( data) ) else {
2584
+ XCTFail ( " Failed to init Request " )
2585
+ return
2586
+ }
2587
+ XCTAssertNoThrow ( try client. execute ( request: request, delegate: backpressureResponseDelegate) . wait ( ) )
2588
+ XCTAssertEqual ( backpressureResponseDelegate. didntWait, false )
2589
+ XCTAssertEqual ( backpressureResponseDelegate. count, 0 )
2590
+ }
2536
2591
}
0 commit comments