Skip to content

Commit f81d0fe

Browse files
authoredMay 20, 2020
draft for streaming el fixes (#215)
1 parent ce82178 commit f81d0fe

File tree

3 files changed

+93
-18
lines changed

3 files changed

+93
-18
lines changed
 

‎Sources/AsyncHTTPClient/HTTPHandler.swift

+25-5
Original file line numberDiff line numberDiff line change
@@ -765,12 +765,32 @@ extension TaskHandler: ChannelDuplexHandler {
765765
return context.eventLoop.makeSucceededFuture(())
766766
}
767767

768-
return body.stream(HTTPClient.Body.StreamWriter { part in
769-
context.eventLoop.assertInEventLoop()
770-
return context.writeAndFlush(self.wrapOutboundOut(.body(part))).map {
771-
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
768+
func doIt() -> EventLoopFuture<Void> {
769+
return body.stream(HTTPClient.Body.StreamWriter { part in
770+
let promise = self.task.eventLoop.makePromise(of: Void.self)
771+
// All writes have to be switched to the channel EL if channel and task ELs differ
772+
if context.eventLoop.inEventLoop {
773+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
774+
} else {
775+
context.eventLoop.execute {
776+
context.writeAndFlush(self.wrapOutboundOut(.body(part)), promise: promise)
777+
}
778+
}
779+
780+
return promise.futureResult.map {
781+
self.callOutToDelegateFireAndForget(value: part, self.delegate.didSendRequestPart)
782+
}
783+
})
784+
}
785+
786+
// Callout to the user to start body streaming should be on task EL
787+
if self.task.eventLoop.inEventLoop {
788+
return doIt()
789+
} else {
790+
return self.task.eventLoop.flatSubmit {
791+
doIt()
772792
}
773-
})
793+
}
774794
}
775795

776796
public func read(context: ChannelHandlerContext) {

‎Tests/AsyncHTTPClientTests/HTTPClientTests+XCTest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ extension HTTPClientTests {
9898
("testAsyncShutdown", testAsyncShutdown),
9999
("testValidationErrorsAreSurfaced", testValidationErrorsAreSurfaced),
100100
("testUploadsReallyStream", testUploadsReallyStream),
101+
("testUploadStreamingCallinToleratedFromOtsideEL", testUploadStreamingCallinToleratedFromOtsideEL),
102+
("testUploadStreamingIsCalledOnTaskEL", testUploadStreamingIsCalledOnTaskEL),
101103
]
102104
}
103105
}

‎Tests/AsyncHTTPClientTests/HTTPClientTests.swift

+66-13
Original file line numberDiff line numberDiff line change
@@ -1705,6 +1705,7 @@ class HTTPClientTests: XCTestCase {
17051705
private let bodyPromises: [EventLoopPromise<ByteBuffer>]
17061706
private let endPromise: EventLoopPromise<Void>
17071707
private var bodyPartsSeenSoFar = 0
1708+
private var atEnd = false
17081709

17091710
init(headPromise: EventLoopPromise<HTTPRequestHead>,
17101711
bodyPromises: [EventLoopPromise<ByteBuffer>],
@@ -1727,10 +1728,14 @@ class HTTPClientTests: XCTestCase {
17271728
context.write(self.wrapOutboundOut(.head(.init(version: .init(major: 1, minor: 1), status: .ok))),
17281729
promise: nil)
17291730
context.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: self.endPromise)
1731+
self.atEnd = true
17301732
}
17311733
}
17321734

17331735
func handlerRemoved(context: ChannelHandlerContext) {
1736+
guard !self.atEnd else {
1737+
return
1738+
}
17341739
struct NotFulfilledError: Error {}
17351740

17361741
self.headPromise.fail(NotFulfilledError())
@@ -1753,10 +1758,7 @@ class HTTPClientTests: XCTestCase {
17531758
let bodyPromises = (0..<16).map { _ in group.next().makePromise(of: ByteBuffer.self) }
17541759
let endPromise = group.next().makePromise(of: Void.self)
17551760
let sentOffAllBodyPartsPromise = group.next().makePromise(of: Void.self)
1756-
// Because of https://github.com/swift-server/async-http-client/issues/200 we also need to pull off a terrible
1757-
// hack and get the internal EventLoop out :(. Once the bug is fixed, this promise should only get the
1758-
// StreamWriter.
1759-
let streamWriterPromise = group.next().makePromise(of: (EventLoop, HTTPClient.Body.StreamWriter).self)
1761+
let streamWriterPromise = group.next().makePromise(of: HTTPClient.Body.StreamWriter.self)
17601762

17611763
func makeServer() -> Channel? {
17621764
return try? ServerBootstrap(group: group)
@@ -1781,12 +1783,7 @@ class HTTPClientTests: XCTestCase {
17811783
method: .POST,
17821784
headers: ["transfer-encoding": "chunked"],
17831785
body: .stream { streamWriter in
1784-
// Due to https://github.com/swift-server/async-http-client/issues/200
1785-
// we also need to pull off a terrible hack and get the internal
1786-
// EventLoop out :(. Once the bug is fixed, this promise should only get
1787-
// the StreamWriter.
1788-
let currentEL = MultiThreadedEventLoopGroup.currentEventLoop! // HACK!!
1789-
streamWriterPromise.succeed((currentEL, streamWriter))
1786+
streamWriterPromise.succeed(streamWriter)
17901787
return sentOffAllBodyPartsPromise.futureResult
17911788
})
17921789
}
@@ -1811,13 +1808,69 @@ class HTTPClientTests: XCTestCase {
18111808
buffer.clear()
18121809
buffer.writeString(String(bodyChunkNumber, radix: 16))
18131810
XCTAssertEqual(1, buffer.readableBytes)
1814-
XCTAssertNoThrow(try streamWriter.0.flatSubmit {
1815-
streamWriter.1.write(.byteBuffer(buffer))
1816-
}.wait())
1811+
XCTAssertNoThrow(try streamWriter.write(.byteBuffer(buffer)).wait())
18171812
XCTAssertNoThrow(XCTAssertEqual(buffer, try bodyPromises[bodyChunkNumber].futureResult.wait()))
18181813
}
18191814
sentOffAllBodyPartsPromise.succeed(())
18201815
XCTAssertNoThrow(try endPromise.futureResult.wait())
18211816
XCTAssertNoThrow(try runningRequest.wait())
18221817
}
1818+
1819+
func testUploadStreamingCallinToleratedFromOtsideEL() throws {
1820+
let httpBin = HTTPBin()
1821+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(self.clientGroup))
1822+
defer {
1823+
XCTAssertNoThrow(try httpClient.syncShutdown())
1824+
XCTAssertNoThrow(try httpBin.shutdown())
1825+
}
1826+
1827+
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .POST, body: .stream(length: 4) { writer in
1828+
let promise = httpClient.eventLoopGroup.next().makePromise(of: Void.self)
1829+
// We have to toleare callins from any thread
1830+
DispatchQueue(label: "upload-streaming").async {
1831+
writer.write(.byteBuffer(ByteBuffer.of(string: "1234"))).whenComplete { _ in
1832+
promise.succeed(())
1833+
}
1834+
}
1835+
return promise.futureResult
1836+
})
1837+
XCTAssertNoThrow(try httpClient.execute(request: request).wait())
1838+
}
1839+
1840+
func testUploadStreamingIsCalledOnTaskEL() throws {
1841+
let group = getDefaultEventLoopGroup(numberOfThreads: 4)
1842+
defer {
1843+
XCTAssertNoThrow(try group.syncShutdownGracefully())
1844+
}
1845+
1846+
let httpBin = HTTPBin()
1847+
let httpClient = HTTPClient(eventLoopGroupProvider: .shared(group))
1848+
defer {
1849+
XCTAssertNoThrow(try httpClient.syncShutdown())
1850+
XCTAssertNoThrow(try httpBin.shutdown())
1851+
}
1852+
1853+
let el1 = group.next()
1854+
let el2 = group.next()
1855+
XCTAssertFalse(el1 === el2)
1856+
1857+
do {
1858+
// Pre-populate pool with a connection on a different EL
1859+
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/get", method: .GET)
1860+
XCTAssertNoThrow(try httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegateAndChannel(on: el2)).wait())
1861+
}
1862+
1863+
let body: HTTPClient.Body = .stream(length: 8) { writer in
1864+
XCTAssert(el1.inEventLoop)
1865+
let buffer = ByteBuffer.of(string: "1234")
1866+
return writer.write(.byteBuffer(buffer)).flatMap {
1867+
XCTAssert(el1.inEventLoop)
1868+
let buffer = ByteBuffer.of(string: "4321")
1869+
return writer.write(.byteBuffer(buffer))
1870+
}
1871+
}
1872+
let request = try HTTPClient.Request(url: "http://localhost:\(httpBin.port)/post", method: .POST, body: body)
1873+
let response = httpClient.execute(request: request, delegate: ResponseAccumulator(request: request), eventLoop: .delegate(on: el1))
1874+
XCTAssertNoThrow(try response.wait())
1875+
}
18231876
}

0 commit comments

Comments
 (0)