@@ -806,4 +806,81 @@ class HTTPClientInternalTests: XCTestCase {
806
806
}
807
807
}
808
808
}
809
+
810
+ func testUploadStreamingIsCalledOnTaskEL( ) throws {
811
+ let group = getDefaultEventLoopGroup ( numberOfThreads: 4 )
812
+ defer {
813
+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
814
+ }
815
+
816
+ let httpBin = HTTPBin ( )
817
+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
818
+ defer {
819
+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
820
+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
821
+ }
822
+
823
+ let el1 = group. next ( )
824
+ let el2 = group. next ( )
825
+ XCTAssert ( el1 !== el2)
826
+
827
+ let body : HTTPClient . Body = . stream( length: 8 ) { writer in
828
+ XCTAssert ( el1. inEventLoop)
829
+ let buffer = ByteBuffer . of ( string: " 1234 " )
830
+ return writer. write ( . byteBuffer( buffer) ) . flatMap {
831
+ XCTAssert ( el1. inEventLoop)
832
+ let buffer = ByteBuffer . of ( string: " 4321 " )
833
+ return writer. write ( . byteBuffer( buffer) )
834
+ }
835
+ }
836
+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /post " , method: . POST, body: body)
837
+ let response = httpClient. execute ( request: request,
838
+ delegate: ResponseAccumulator ( request: request) ,
839
+ eventLoop: HTTPClient . EventLoopPreference ( . testOnly_exact( channelOn: el2,
840
+ delegateOn: el1) ) )
841
+ XCTAssert ( el1 === response. eventLoop)
842
+ XCTAssertNoThrow ( try response. wait ( ) )
843
+ }
844
+
845
+ func testWeCanActuallyExactlySetTheEventLoops( ) throws {
846
+ let group = getDefaultEventLoopGroup ( numberOfThreads: 3 )
847
+ defer {
848
+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
849
+ }
850
+
851
+ let httpBin = HTTPBin ( )
852
+ let httpClient = HTTPClient ( eventLoopGroupProvider: . shared( group) )
853
+ defer {
854
+ XCTAssertNoThrow ( try httpClient. syncShutdown ( ) )
855
+ XCTAssertNoThrow ( try httpBin. shutdown ( ) )
856
+ }
857
+
858
+ let el1 = group. next ( )
859
+ let el2 = group. next ( )
860
+ XCTAssert ( el1 !== el2)
861
+
862
+ let taskPromise = group. next ( ) . makePromise ( of: HTTPClient . Task< HTTPClient . Response> . self )
863
+ let body : HTTPClient . Body = . stream( length: 8 ) { writer in
864
+ XCTAssert ( el1. inEventLoop)
865
+ let buffer = ByteBuffer . of ( string: " 1234 " )
866
+ return writer. write ( . byteBuffer( buffer) ) . flatMap {
867
+ XCTAssert ( el1. inEventLoop)
868
+ let buffer = ByteBuffer . of ( string: " 4321 " )
869
+ return taskPromise. futureResult. map { ( task: HTTPClient . Task < HTTPClient . Response > ) -> Void in
870
+ XCTAssertNotNil ( task. connection)
871
+ XCTAssert ( task. connection? . channel. eventLoop === el2)
872
+ } . flatMap {
873
+ writer. write ( . byteBuffer( buffer) )
874
+ }
875
+ }
876
+ }
877
+ let request = try HTTPClient . Request ( url: " http://localhost: \( httpBin. port) /post " , method: . POST, body: body)
878
+ let response = httpClient. execute ( request: request,
879
+ delegate: ResponseAccumulator ( request: request) ,
880
+ eventLoop: HTTPClient . EventLoopPreference ( . testOnly_exact( channelOn: el2,
881
+ delegateOn: el1) ) )
882
+ taskPromise. succeed ( response)
883
+ XCTAssert ( el1 === response. eventLoop)
884
+ XCTAssertNoThrow ( try response. wait ( ) )
885
+ }
809
886
}
0 commit comments