From ae460040a804f3de2d06d5684a795daef59ef4cd Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Thu, 6 Aug 2020 16:58:04 +0100 Subject: [PATCH 1/2] implement new streamwriter api --- Sources/AsyncHTTPClient/HTTPHandler.swift | 116 ++++++++++++++++-- .../HTTPClientInternalTests.swift | 57 +++++---- .../HTTPClientTests+XCTest.swift | 1 + .../HTTPClientTests.swift | 78 +++++++----- .../RequestValidationTests.swift | 10 +- 5 files changed, 196 insertions(+), 66 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index b949cff21..d3fd1066b 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -32,32 +32,92 @@ extension HTTPClient { /// /// - parameters: /// - closure: function that will be called to write actual bytes to the channel. + @available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2") public init(closure: @escaping (IOData) -> EventLoopFuture) { self.closure = closure } + // This is needed so we don't have build warnings in the client itself + init(internalClosure: @escaping (IOData) -> EventLoopFuture) { + self.closure = internalClosure + } + /// Write data to server. /// /// - parameters: /// - data: `IOData` to write. + @available(*, deprecated, message: "") public func write(_ data: IOData) -> EventLoopFuture { return self.closure(data) } } + public struct StreamWriter2 { + public let allocator: ByteBufferAllocator + let onChunk: (IOData) -> EventLoopFuture + let onComplete: EventLoopPromise + + public init(allocator: ByteBufferAllocator, onChunk: @escaping (IOData) -> EventLoopFuture, onComplete: EventLoopPromise) { + self.allocator = allocator + self.onChunk = onChunk + self.onComplete = onComplete + } + + public func write(_ buffer: ByteBuffer) -> EventLoopFuture { + self.onChunk(.byteBuffer(buffer)) + } + + public func write(_ data: IOData) -> EventLoopFuture { + self.onChunk(data) + } + + public func write(_ buffer: ByteBuffer, promise: EventLoopPromise?) { + self.onChunk(.byteBuffer(buffer)).cascade(to: promise) + } + + public func write(_ data: IOData, promise: EventLoopPromise?) { + self.onChunk(data).cascade(to: promise) + } + + public func end() { + self.onComplete.succeed(()) + } + + public func fail(_ error: Error) { + self.onComplete.fail(error) + } + } + /// Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil, /// unless `Trasfer-Encoding: chunked` header is set. public var length: Int? /// Body chunk provider. public var stream: (StreamWriter) -> EventLoopFuture + var stream2: ((StreamWriter2) -> Void)? + + @available(*, deprecated, message: "") + init(length: Int?, stream: @escaping (StreamWriter) -> EventLoopFuture) { + self.length = length + self.stream = stream + self.stream2 = nil + } + + init(length: Int?, stream: @escaping (StreamWriter2) -> Void) { + self.length = length + self.stream = { _ in + preconditionFailure("stream writer 2 was called") + } + self.stream2 = stream + } /// Create and stream body using `ByteBuffer`. /// /// - parameters: /// - buffer: Body `ByteBuffer` representation. public static func byteBuffer(_ buffer: ByteBuffer) -> Body { - return Body(length: buffer.readableBytes) { writer in - writer.write(.byteBuffer(buffer)) + return Body(length: buffer.readableBytes) { (writer: StreamWriter2) in + writer.write(.byteBuffer(buffer), promise: nil) + writer.end() } } @@ -67,17 +127,30 @@ extension HTTPClient { /// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil, /// unless `Transfer-Encoding: chunked` header is set. /// - stream: Body chunk provider. + @available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2 instead") public static func stream(length: Int? = nil, _ stream: @escaping (StreamWriter) -> EventLoopFuture) -> Body { return Body(length: length, stream: stream) } + /// Create and stream body using `StreamWriter`. + /// + /// - parameters: + /// - length: Body size. Request validation will be failed with `HTTPClientErrors.contentLengthMissing` if nil, + /// unless `Transfer-Encoding: chunked` header is set. + /// - stream: Body chunk provider. + public static func stream2(length: Int? = nil, _ stream: @escaping (StreamWriter2) -> Void) -> Body { + return Body(length: length, stream: stream) + } + /// Create and stream body using `Data`. /// /// - parameters: /// - data: Body `Data` representation. public static func data(_ data: Data) -> Body { - return Body(length: data.count) { writer in - writer.write(.byteBuffer(ByteBuffer(bytes: data))) + return Body(length: data.count) { (writer: StreamWriter2) in + let buffer = writer.allocator.buffer(data: data) + writer.write(.byteBuffer(buffer), promise: nil) + writer.end() } } @@ -86,8 +159,10 @@ extension HTTPClient { /// - parameters: /// - string: Body `String` representation. public static func string(_ string: String) -> Body { - return Body(length: string.utf8.count) { writer in - writer.write(.byteBuffer(ByteBuffer(string: string))) + return Body(length: string.utf8.count) { (writer: StreamWriter2) in + let buffer = writer.allocator.buffer(string: string) + writer.write(.byteBuffer(buffer), promise: nil) + writer.end() } } } @@ -874,7 +949,32 @@ extension TaskHandler: ChannelDuplexHandler { let channel = context.channel func doIt() -> EventLoopFuture { - return body.stream(HTTPClient.Body.StreamWriter { part in + if let stream2 = body.stream2 { + let completion = channel.eventLoop.makePromise(of: Void.self) + stream2(HTTPClient.Body.StreamWriter2(allocator: channel.allocator, onChunk: { part in + let promise = self.task.eventLoop.makePromise(of: Void.self) + // All writes have to be switched to the channel EL if channel and task ELs differ + if channel.eventLoop.inEventLoop { + self.writeBodyPart(context: context, part: part, promise: promise) + } else { + channel.eventLoop.execute { + self.writeBodyPart(context: context, part: part, promise: promise) + } + } + + promise.futureResult.whenFailure { error in + completion.fail(error) + } + + return promise.futureResult.map { + self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) + } + }, onComplete: completion)) + + return completion.futureResult + } + + return body.stream(HTTPClient.Body.StreamWriter(internalClosure: { part in let promise = self.task.eventLoop.makePromise(of: Void.self) // All writes have to be switched to the channel EL if channel and task ELs differ if channel.eventLoop.inEventLoop { @@ -888,7 +988,7 @@ extension TaskHandler: ChannelDuplexHandler { return promise.futureResult.map { self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart) } - }) + })) } // Callout to the user to start body streaming should be on task EL diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 52348ba94..c4f6dd7ae 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -169,17 +169,19 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpBin.shutdown()) } - let body: HTTPClient.Body = .stream(length: 50) { writer in + let body: HTTPClient.Body = .stream2(length: 50) { writer in do { var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1") request.headers.add(name: "Accept", value: "text/event-stream") let delegate = HTTPClientCopyingDelegate { part in - writer.write(.byteBuffer(part)) + writer.write(part) + } + httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete { _ in + writer.end() } - return httpClient.execute(request: request, delegate: delegate).futureResult } catch { - return httpClient.eventLoopGroup.next().makeFailedFuture(error) + writer.fail(error) } } @@ -198,13 +200,13 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpBin.shutdown()) } - var body: HTTPClient.Body = .stream(length: 50) { _ in - httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse) + var body: HTTPClient.Body = .stream2(length: 50) { writer in + writer.fail(HTTPClientError.invalidProxyResponse) } XCTAssertThrowsError(try httpClient.post(url: "http://localhost:\(httpBin.port)/post", body: body).wait()) - body = .stream(length: 50) { _ in + body = .stream2(length: 50) { writer in do { var request = try Request(url: "http://localhost:\(httpBin.port)/events/10/1") request.headers.add(name: "Accept", value: "text/event-stream") @@ -212,9 +214,11 @@ class HTTPClientInternalTests: XCTestCase { let delegate = HTTPClientCopyingDelegate { _ in httpClient.eventLoopGroup.next().makeFailedFuture(HTTPClientError.invalidProxyResponse) } - return httpClient.execute(request: request, delegate: delegate).futureResult + httpClient.execute(request: request, delegate: delegate).futureResult.whenComplete { _ in + writer.end() + } } catch { - return httpClient.eventLoopGroup.next().makeFailedFuture(error) + writer.fail(error) } } @@ -432,11 +436,11 @@ class HTTPClientInternalTests: XCTestCase { XCTAssertNoThrow(try httpClient.syncShutdown(requiresCleanClose: true)) } - let body: HTTPClient.Body = .stream(length: 8) { writer in - let buffer = ByteBuffer(string: "1234") - return writer.write(.byteBuffer(buffer)).flatMap { - let buffer = ByteBuffer(string: "4321") - return writer.write(.byteBuffer(buffer)) + let body: HTTPClient.Body = .stream2(length: 8) { writer in + writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in + writer.write(writer.allocator.buffer(string: "4321")).whenComplete { _ in + writer.end() + } } } @@ -885,13 +889,13 @@ class HTTPClientInternalTests: XCTestCase { let el2 = group.next() XCTAssert(el1 !== el2) - let body: HTTPClient.Body = .stream(length: 8) { writer in + let body: HTTPClient.Body = .stream2(length: 8) { writer in XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "1234") - return writer.write(.byteBuffer(buffer)).flatMap { + return writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "4321") - return writer.write(.byteBuffer(buffer)) + writer.write(writer.allocator.buffer(string: "4321")).whenComplete { _ in + writer.end() + } } } let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body) @@ -921,17 +925,17 @@ class HTTPClientInternalTests: XCTestCase { XCTAssert(el1 !== el2) let taskPromise = group.next().makePromise(of: HTTPClient.Task.self) - let body: HTTPClient.Body = .stream(length: 8) { writer in + let body: HTTPClient.Body = .stream2(length: 8) { writer in XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "1234") - return writer.write(.byteBuffer(buffer)).flatMap { + writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in XCTAssert(el1.inEventLoop) - let buffer = ByteBuffer(string: "4321") return taskPromise.futureResult.map { (task: HTTPClient.Task) -> Void in XCTAssertNotNil(task.connection) XCTAssert(task.connection?.channel.eventLoop === el2) }.flatMap { - writer.write(.byteBuffer(buffer)) + writer.write(writer.allocator.buffer(string: "4321")) + }.whenComplete { _ in + writer.end() } } } @@ -1070,8 +1074,9 @@ class HTTPClientInternalTests: XCTestCase { try channel.pipeline.addHandler(handler).wait() var request = try Request(url: "http://localhost:8080/post") - request.body = .stream(length: 1) { writer in - writer.write(.byteBuffer(ByteBuffer(string: "1234"))) + request.body = .stream2(length: 1) { writer in + writer.write(writer.allocator.buffer(string: "1234"), promise: nil) + writer.end() } XCTAssertThrowsError(try channel.writeOutbound(request)) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index eb4dd7cb5..6b002d88c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -56,6 +56,7 @@ extension HTTPClientTests { ("testProxyPlaintextWithCorrectlyAuthorization", testProxyPlaintextWithCorrectlyAuthorization), ("testProxyPlaintextWithIncorrectlyAuthorization", testProxyPlaintextWithIncorrectlyAuthorization), ("testUploadStreaming", testUploadStreaming), + ("testUploadStreaming2", testUploadStreaming2), ("testNoContentLengthForSSLUncleanShutdown", testNoContentLengthForSSLUncleanShutdown), ("testNoContentLengthWithIgnoreErrorForSSLUncleanShutdown", testNoContentLengthWithIgnoreErrorForSSLUncleanShutdown), ("testCorrectContentLengthForSSLUncleanShutdown", testCorrectContentLengthForSSLUncleanShutdown), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 04287ef0d..0fdb37796 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -659,6 +659,23 @@ class HTTPClientTests: XCTestCase { XCTAssertEqual("12344321", data.data) } + func testUploadStreaming2() throws { + let body: HTTPClient.Body = .stream2(length: 8) { writer in + var buffer = ByteBuffer(string: "1234") + writer.write(.byteBuffer(buffer), promise: nil) + buffer = ByteBuffer(string: "4321") + writer.write(.byteBuffer(buffer), promise: nil) + writer.end() + } + + let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: body).wait() + let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) } + let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!) + + XCTAssertEqual(.ok, response.status) + XCTAssertEqual("12344321", data.data) + } + func testNoContentLengthForSSLUncleanShutdown() throws { // NIOTS deals with ssl unclean shutdown internally guard !isTestingNIOTS() else { return } @@ -1812,8 +1829,8 @@ class HTTPClientTests: XCTestCase { } func testValidationErrorsAreSurfaced() throws { - let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .TRACE, body: .stream { _ in - self.defaultClient.eventLoopGroup.next().makeSucceededFuture(()) + let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .TRACE, body: .stream2 { writer in + writer.end() }) let runningRequest = self.defaultClient.execute(request: request) XCTAssertThrowsError(try runningRequest.wait()) { error in @@ -1883,7 +1900,7 @@ class HTTPClientTests: XCTestCase { let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) } let endPromise = group.next().makePromise(of: Void.self) let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self) - let streamWriterPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter.self) + let writerPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter2.self) func makeServer() -> Channel? { return try? ServerBootstrap(group: group) @@ -1907,9 +1924,11 @@ class HTTPClientTests: XCTestCase { return try? HTTPClient.Request(url: "http://\(localAddress.ipAddress!):\(localAddress.port!)", method: .POST, headers: ["transfer-encoding": "chunked"], - body: .stream { streamWriter in - streamWriterPromise.succeed(streamWriter) - return sentOffAllBodyPartsPromise.futureResult + body: .stream2 { writer in + writerPromise.succeed(writer) + sentOffAllBodyPartsPromise.futureResult.whenComplete { _ in + writer.end() + } }) } @@ -1923,7 +1942,7 @@ class HTTPClientTests: XCTestCase { var buffer = ByteBufferAllocator().buffer(capacity: 1) let runningRequest = client.execute(request: request) - guard let streamWriter = try? streamWriterPromise.futureResult.wait() else { + guard let writer = try? writerPromise.futureResult.wait() else { XCTFail("didn't get StreamWriter") return } @@ -1933,7 +1952,7 @@ class HTTPClientTests: XCTestCase { buffer.clear() buffer.writeString(String(bodyChunkNumber, radix: 16)) XCTAssertEqual(1, buffer.readableBytes) - XCTAssertNoThrow(try streamWriter.write(.byteBuffer(buffer)).wait()) + XCTAssertNoThrow(try writer.write(.byteBuffer(buffer)).wait()) XCTAssertNoThrow(XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait())) } sentOffAllBodyPartsPromise.succeed(()) @@ -1942,15 +1961,13 @@ class HTTPClientTests: XCTestCase { } func testUploadStreamingCallinToleratedFromOtsideEL() throws { - let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream(length: 4) { writer in - let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) + let request = try HTTPClient.Request(url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream2(length: 4) { writer in // We have to toleare callins from any thread DispatchQueue(label: "upload-streaming").async { - writer.write(.byteBuffer(ByteBuffer(string: "1234"))).whenComplete { _ in - promise.succeed(()) + writer.write(writer.allocator.buffer(string: "1234")).whenComplete { _ in + writer.end() } } - return promise.futureResult }) XCTAssertNoThrow(try self.defaultClient.execute(request: request).wait()) } @@ -2395,8 +2412,9 @@ class HTTPClientTests: XCTestCase { } var request = try HTTPClient.Request(url: "http://localhost:\(server.serverPort)/") - request.body = .stream { writer in - writer.write(.byteBuffer(ByteBuffer(string: "1234"))) + request.body = .stream2 { writer in + writer.write(writer.allocator.buffer(string: "1234"), promise: nil) + writer.end() } let future = client.execute(request: request) @@ -2496,12 +2514,12 @@ class HTTPClientTests: XCTestCase { XCTAssertThrowsError( try self.defaultClient.execute(request: Request(url: url, - body: .stream(length: 10) { streamWriter in - let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) + body: .stream2(length: 10) { writer in DispatchQueue(label: "content-length-test").async { - streamWriter.write(.byteBuffer(ByteBuffer(string: "1"))).cascade(to: promise) + writer.write(writer.allocator.buffer(string: "1")).whenComplete { _ in + writer.end() + } } - return promise.futureResult })).wait()) { error in XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch) } @@ -2526,8 +2544,9 @@ class HTTPClientTests: XCTestCase { XCTAssertThrowsError( try self.defaultClient.execute(request: Request(url: url, - body: .stream(length: 1) { streamWriter in - streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong))) + body: .stream2(length: 1) { writer in + writer.write(writer.allocator.buffer(string: tooLong), promise: nil) + writer.end() })).wait()) { error in XCTAssertEqual(error as! HTTPClientError, HTTPClientError.bodyLengthMismatch) } @@ -2549,14 +2568,14 @@ class HTTPClientTests: XCTestCase { func testBodyUploadAfterEndFails() { let url = self.defaultHTTPBinURLPrefix + "post" - func uploader(_ streamWriter: HTTPClient.Body.StreamWriter) -> EventLoopFuture { - let done = streamWriter.write(.byteBuffer(ByteBuffer(string: "X"))) + func uploader(_ writer: HTTPClient.Body.StreamWriter2) { + let done = writer.write(.byteBuffer(ByteBuffer(string: "X"))) done.recover { error -> Void in XCTFail("unexpected error \(error)") }.whenSuccess { // This is executed when we have already sent the end of the request. done.eventLoop.execute { - streamWriter.write(.byteBuffer(ByteBuffer(string: "BAD BAD BAD"))).whenComplete { result in + writer.write(writer.allocator.buffer(string: "BAD BAD BAD")).whenComplete { result in switch result { case .success: XCTFail("we succeeded writing bytes after the end!?") @@ -2566,13 +2585,15 @@ class HTTPClientTests: XCTestCase { } } } - return done + done.whenComplete { _ in + writer.end() + } } XCTAssertThrowsError( try self.defaultClient.execute(request: Request(url: url, - body: .stream(length: 1, uploader))).wait()) { error in + body: .stream2(length: 1, uploader))).wait()) { error in XCTAssertEqual(HTTPClientError.writeAfterRequestSent, error as? HTTPClientError) } @@ -2590,8 +2611,9 @@ class HTTPClientTests: XCTestCase { let tooLong = "XBAD BAD BAD NOT HTTP/1.1\r\n\r\n" let future = self.defaultClient.execute( request: try Request(url: "http://localhost:\(server.serverPort)", - body: .stream(length: 1) { streamWriter in - streamWriter.write(.byteBuffer(ByteBuffer(string: tooLong))) + body: .stream2(length: 1) { writer in + writer.write(writer.allocator.buffer(string: tooLong), promise: nil) + writer.end() })) XCTAssertNoThrow(try server.readInbound()) // .head diff --git a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift index 3e34f905c..8129979b2 100644 --- a/Tests/AsyncHTTPClientTests/RequestValidationTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestValidationTests.swift @@ -125,8 +125,9 @@ class RequestValidationTests: XCTestCase { // Body length is _not_ known for method: HTTPMethod in [.GET, .HEAD, .DELETE, .CONNECT] { var headers: HTTPHeaders = .init() - let body: HTTPClient.Body = .stream { writer in - writer.write(.byteBuffer(ByteBuffer(bytes: [0]))) + let body: HTTPClient.Body = .stream2 { writer in + writer.write(writer.allocator.buffer(bytes: [0]), promise: nil) + writer.end() } XCTAssertNoThrow(try headers.validate(method: method, body: body)) XCTAssertTrue(headers["content-length"].isEmpty) @@ -144,8 +145,9 @@ class RequestValidationTests: XCTestCase { // Body length is _not_ known for method: HTTPMethod in [.POST, .PUT] { var headers: HTTPHeaders = .init() - let body: HTTPClient.Body = .stream { writer in - writer.write(.byteBuffer(ByteBuffer(bytes: [0]))) + let body: HTTPClient.Body = .stream2 { writer in + writer.write(writer.allocator.buffer(bytes: [0]), promise: nil) + writer.end() } XCTAssertNoThrow(try headers.validate(method: method, body: body)) XCTAssertTrue(headers["content-length"].isEmpty) From 30a7286fe419bccd99220ac927e8e90a4e22d87b Mon Sep 17 00:00:00 2001 From: Artem Redkin Date: Thu, 6 Aug 2020 17:06:12 +0100 Subject: [PATCH 2/2] fix warning --- Sources/AsyncHTTPClient/HTTPHandler.swift | 8 ++++---- .../HTTPClientTests+XCTest.swift | 1 - .../AsyncHTTPClientTests/HTTPClientTests.swift | 17 ----------------- 3 files changed, 4 insertions(+), 22 deletions(-) diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index d3fd1066b..6b8ee0517 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -46,7 +46,7 @@ extension HTTPClient { /// /// - parameters: /// - data: `IOData` to write. - @available(*, deprecated, message: "") + @available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2") public func write(_ data: IOData) -> EventLoopFuture { return self.closure(data) } @@ -64,11 +64,11 @@ extension HTTPClient { } public func write(_ buffer: ByteBuffer) -> EventLoopFuture { - self.onChunk(.byteBuffer(buffer)) + return self.onChunk(.byteBuffer(buffer)) } public func write(_ data: IOData) -> EventLoopFuture { - self.onChunk(data) + return self.onChunk(data) } public func write(_ buffer: ByteBuffer, promise: EventLoopPromise?) { @@ -95,7 +95,7 @@ extension HTTPClient { public var stream: (StreamWriter) -> EventLoopFuture var stream2: ((StreamWriter2) -> Void)? - @available(*, deprecated, message: "") + @available(*, deprecated, message: "StreamWriter is deprecated, please use StreamWriter2") init(length: Int?, stream: @escaping (StreamWriter) -> EventLoopFuture) { self.length = length self.stream = stream diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift index 6b002d88c..eb4dd7cb5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift @@ -56,7 +56,6 @@ extension HTTPClientTests { ("testProxyPlaintextWithCorrectlyAuthorization", testProxyPlaintextWithCorrectlyAuthorization), ("testProxyPlaintextWithIncorrectlyAuthorization", testProxyPlaintextWithIncorrectlyAuthorization), ("testUploadStreaming", testUploadStreaming), - ("testUploadStreaming2", testUploadStreaming2), ("testNoContentLengthForSSLUncleanShutdown", testNoContentLengthForSSLUncleanShutdown), ("testNoContentLengthWithIgnoreErrorForSSLUncleanShutdown", testNoContentLengthWithIgnoreErrorForSSLUncleanShutdown), ("testCorrectContentLengthForSSLUncleanShutdown", testCorrectContentLengthForSSLUncleanShutdown), diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 0fdb37796..6d114f392 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -643,23 +643,6 @@ class HTTPClientTests: XCTestCase { } func testUploadStreaming() throws { - let body: HTTPClient.Body = .stream(length: 8) { writer in - let buffer = ByteBuffer(string: "1234") - return writer.write(.byteBuffer(buffer)).flatMap { - let buffer = ByteBuffer(string: "4321") - return writer.write(.byteBuffer(buffer)) - } - } - - let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: body).wait() - let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) } - let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!) - - XCTAssertEqual(.ok, response.status) - XCTAssertEqual("12344321", data.data) - } - - func testUploadStreaming2() throws { let body: HTTPClient.Body = .stream2(length: 8) { writer in var buffer = ByteBuffer(string: "1234") writer.write(.byteBuffer(buffer), promise: nil)