diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 49cac8454..863a377d9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -96,6 +96,7 @@ extension HTTPClientTests { ("testRacePoolIdleConnectionsAndGet", testRacePoolIdleConnectionsAndGet), ("testAvoidLeakingTLSHandshakeCompletionPromise", testAvoidLeakingTLSHandshakeCompletionPromise), ("testAsyncShutdown", testAsyncShutdown), + ("testUploadsReallyStream", testUploadsReallyStream), ] } } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index e9e6a3c2b..4505f44ce 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -1678,4 +1678,129 @@ class HTTPClientTests: XCTestCase { } XCTAssertNoThrow(try promise.futureResult.wait()) } + + func testUploadsReallyStream() { + final class HTTPServer: ChannelInboundHandler { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + private let headPromise: EventLoopPromise + private let bodyPromises: [EventLoopPromise] + private let endPromise: EventLoopPromise + private var bodyPartsSeenSoFar = 0 + + init(headPromise: EventLoopPromise, + bodyPromises: [EventLoopPromise], + endPromise: EventLoopPromise) { + self.headPromise = headPromise + self.bodyPromises = bodyPromises + self.endPromise = endPromise + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + switch self.unwrapInboundIn(data) { + case .head(let head): + XCTAssert(self.bodyPartsSeenSoFar == 0) + self.headPromise.succeed(head) + case .body(let bytes): + let myNumber = self.bodyPartsSeenSoFar + self.bodyPartsSeenSoFar += 1 + self.bodyPromises.dropFirst(myNumber).first?.succeed(bytes) ?? XCTFail("ouch, too many chunks") + case .end: + context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))), + promise: nil) + context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: self.endPromise) + } + } + + func handlerRemoved(context: ChannelHandlerContext) { + struct NotFulfilledError: Error {} + + self.headPromise.fail(NotFulfilledError()) + self.bodyPromises.forEach { + $0.fail(NotFulfilledError()) + } + self.endPromise.fail(NotFulfilledError()) + } + } + + let group = MultiThreadedEventLoopGroup(numberOfThreads: 2) + defer { + XCTAssertNoThrow(try group.syncShutdownGracefully()) + } + let client = HTTPClient(eventLoopGroupProvider: .shared(group)) + defer { + XCTAssertNoThrow(try client.syncShutdown()) + } + let headPromise = group.next().makePromise(of: HTTPRequestHead.self) + let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) } + let endPromise = group.next().makePromise(of: Void.self) + let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self) + // Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible + // hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the + // StreamWriter. + let streamWriterPromise = group.next().makePromise(of: (EventLoop, HTTPClient.Body.StreamWriter).self) + + func makeServer() -> Channel? { + return try? ServerBootstrap(group: group) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline().flatMap { + channel.pipeline.addHandler(HTTPServer(headPromise: headPromise, + bodyPromises: bodyPromises, + endPromise: endPromise)) + } + } + .serverChannelOption(ChannelOptions.socket(.init(SOL_SOCKET), .init(SO_REUSEADDR)), value: 1) + .bind(host: "127.0.0.1", port: 0) + .wait() + } + + func makeRequest(server: Channel) -> Request? { + guard let localAddress = server.localAddress else { + return nil + } + + return try? HTTPClient.Request(url: "http://\(localAddress.ipAddress!):\(localAddress.port!)", + method: .POST, + headers: ["transfer-encoding": "chunked"], + body: .stream { streamWriter in + // Due to https://github.com/swift-server/async-http-client/issues/200 + // we also need to pull off a terrible hack and get the internal + // EventLoop out :(. Once the bug is fixed, this promise should only get + // the StreamWriter. + let currentEL = MultiThreadedEventLoopGroup.currentEventLoop! // HACK!! + streamWriterPromise.succeed((currentEL, streamWriter)) + return sentOffAllBodyPartsPromise.futureResult + }) + } + + guard let server = makeServer(), let request = makeRequest(server: server) else { + XCTFail("couldn't make a server Channel and a matching Request...") + return + } + defer { + XCTAssertNoThrow(try server.close().wait()) + } + + var buffer = ByteBufferAllocator().buffer(capacity: 1) + let runningRequest = client.execute(request: request) + guard let streamWriter = try? streamWriterPromise.futureResult.wait() else { + XCTFail("didn't get StreamWriter") + return + } + + XCTAssertNoThrow(XCTAssertEqual(.POST, try headPromise.futureResult.wait().method)) + for bodyChunkNumber in 0..<16 { + buffer.clear() + buffer.writeString(String(bodyChunkNumber, radix: 16)) + XCTAssertEqual(1, buffer.readableBytes) + XCTAssertNoThrow(try streamWriter.0.flatSubmit { + streamWriter.1.write(.byteBuffer(buffer)) + }.wait()) + XCTAssertNoThrow(XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait())) + } + sentOffAllBodyPartsPromise.succeed(()) + XCTAssertNoThrow(try endPromise.futureResult.wait()) + XCTAssertNoThrow(try runningRequest.wait()) + } }