@@ -1678,4 +1678,129 @@ class HTTPClientTests: XCTestCase {
1678
1678
}
1679
1679
XCTAssertNoThrow ( try promise. futureResult. wait ( ) )
1680
1680
}
1681
+
1682
+ func testUploadsReallyStream( ) {
1683
+ final class HTTPServer : ChannelInboundHandler {
1684
+ typealias InboundIn = HTTPServerRequestPart
1685
+ typealias OutboundOut = HTTPServerResponsePart
1686
+
1687
+ private let headPromise : EventLoopPromise < HTTPRequestHead >
1688
+ private let bodyPromises : [ EventLoopPromise < ByteBuffer > ]
1689
+ private let endPromise : EventLoopPromise < Void >
1690
+ private var bodyPartsSeenSoFar = 0
1691
+
1692
+ init ( headPromise: EventLoopPromise < HTTPRequestHead > ,
1693
+ bodyPromises: [ EventLoopPromise < ByteBuffer > ] ,
1694
+ endPromise: EventLoopPromise < Void > ) {
1695
+ self . headPromise = headPromise
1696
+ self . bodyPromises = bodyPromises
1697
+ self . endPromise = endPromise
1698
+ }
1699
+
1700
+ func channelRead( context: ChannelHandlerContext , data: NIOAny ) {
1701
+ switch self . unwrapInboundIn ( data) {
1702
+ case . head( let head) :
1703
+ XCTAssert ( self . bodyPartsSeenSoFar == 0 )
1704
+ self . headPromise. succeed ( head)
1705
+ case . body( let bytes) :
1706
+ let myNumber = self . bodyPartsSeenSoFar
1707
+ self . bodyPartsSeenSoFar += 1
1708
+ self . bodyPromises. dropFirst ( myNumber) . first? . succeed ( bytes) ?? XCTFail ( " ouch, too many chunks " )
1709
+ case . end:
1710
+ context. write ( self . wrapOutboundOut ( . head( . init( version: . init( major: 1 , minor: 1 ) , status: . ok) ) ) ,
1711
+ promise: nil )
1712
+ context. writeAndFlush ( self . wrapOutboundOut ( . end( nil ) ) , promise: self . endPromise)
1713
+ }
1714
+ }
1715
+
1716
+ func handlerRemoved( context: ChannelHandlerContext ) {
1717
+ struct NotFulfilledError : Error { }
1718
+
1719
+ self . headPromise. fail ( NotFulfilledError ( ) )
1720
+ self . bodyPromises. forEach {
1721
+ $0. fail ( NotFulfilledError ( ) )
1722
+ }
1723
+ self . endPromise. fail ( NotFulfilledError ( ) )
1724
+ }
1725
+ }
1726
+
1727
+ let group = MultiThreadedEventLoopGroup ( numberOfThreads: 2 )
1728
+ defer {
1729
+ XCTAssertNoThrow ( try group. syncShutdownGracefully ( ) )
1730
+ }
1731
+ let client = HTTPClient ( eventLoopGroupProvider: . shared( group) )
1732
+ defer {
1733
+ XCTAssertNoThrow ( try client. syncShutdown ( ) )
1734
+ }
1735
+ let headPromise = group. next ( ) . makePromise ( of: HTTPRequestHead . self)
1736
+ let bodyPromises = ( 0 ..< 16 ) . map { _ in group. next ( ) . makePromise ( of: ByteBuffer . self) }
1737
+ let endPromise = group. next ( ) . makePromise ( of: Void . self)
1738
+ let sentOffAllBodyPartsPromise = group. next ( ) . makePromise ( of: Void . self)
1739
+ // Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible
1740
+ // hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the
1741
+ // StreamWriter.
1742
+ let streamWriterPromise = group. next ( ) . makePromise ( of: ( EventLoop, HTTPClient . Body. StreamWriter) . self)
1743
+
1744
+ func makeServer( ) -> Channel ? {
1745
+ return try ? ServerBootstrap ( group: group)
1746
+ . childChannelInitializer { channel in
1747
+ channel. pipeline. configureHTTPServerPipeline ( ) . flatMap {
1748
+ channel. pipeline. addHandler ( HTTPServer ( headPromise: headPromise,
1749
+ bodyPromises: bodyPromises,
1750
+ endPromise: endPromise) )
1751
+ }
1752
+ }
1753
+ . serverChannelOption ( ChannelOptions . socket ( . init( SOL_SOCKET) , . init( SO_REUSEADDR) ) , value: 1 )
1754
+ . bind ( host: " 127.0.0.1 " , port: 0 )
1755
+ . wait ( )
1756
+ }
1757
+
1758
+ func makeRequest( server: Channel ) -> Request ? {
1759
+ guard let localAddress = server. localAddress else {
1760
+ return nil
1761
+ }
1762
+
1763
+ return try ? HTTPClient . Request ( url: " http:// \( localAddress. ipAddress!) : \( localAddress. port!) " ,
1764
+ method: . POST,
1765
+ headers: [ " transfer-encoding " : " chunked " ] ,
1766
+ body: . stream { streamWriter in
1767
+ // Due to https://github.com/swift-server/async-http-client/issues/200
1768
+ // we also need to pull off a terrible hack and get the internal
1769
+ // EventLoop out :(. Once the bug is fixed, this promise should only get
1770
+ // the StreamWriter.
1771
+ let currentEL = MultiThreadedEventLoopGroup . currentEventLoop // HACK!!
1772
+ streamWriterPromise. succeed ( ( currentEL, streamWriter) )
1773
+ return sentOffAllBodyPartsPromise. futureResult
1774
+ } )
1775
+ }
1776
+
1777
+ guard let server = makeServer ( ) , let request = makeRequest ( server: server) else {
1778
+ XCTFail ( " couldn't make a server Channel and a matching Request... " )
1779
+ return
1780
+ }
1781
+ defer {
1782
+ XCTAssertNoThrow ( try server. close ( ) . wait ( ) )
1783
+ }
1784
+
1785
+ var buffer = ByteBufferAllocator ( ) . buffer ( capacity: 1 )
1786
+ let runningRequest = client. execute ( request: request)
1787
+ guard let streamWriter = try ? streamWriterPromise. futureResult. wait ( ) else {
1788
+ XCTFail ( " didn't get StreamWriter " )
1789
+ return
1790
+ }
1791
+
1792
+ XCTAssertNoThrow ( XCTAssertEqual ( . POST, try headPromise. futureResult. wait ( ) . method) )
1793
+ for bodyChunkNumber in 0 ..< 16 {
1794
+ buffer. clear ( )
1795
+ buffer. writeString ( String ( bodyChunkNumber, radix: 16 ) )
1796
+ XCTAssertEqual ( 1 , buffer. readableBytes)
1797
+ XCTAssertNoThrow ( try streamWriter. 0 . flatSubmit {
1798
+ streamWriter. 1 . write ( . byteBuffer( buffer) )
1799
+ } . wait ( ) )
1800
+ XCTAssertNoThrow ( XCTAssertEqual ( buffer, try bodyPromises [ bodyChunkNumber] . futureResult. wait ( ) ) )
1801
+ }
1802
+ sentOffAllBodyPartsPromise. succeed ( ( ) )
1803
+ XCTAssertNoThrow ( try endPromise. futureResult. wait ( ) )
1804
+ XCTAssertNoThrow ( try runningRequest. wait ( ) )
1805
+ }
1681
1806
}
0 commit comments