Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 618a81a

Browse files
committedDec 2, 2021
async/await
fix typo always overwrite `Transport-Encoding` and `Content-Length` fix documentation add `Hashable` conformance to `RequestBodyLength` remove duplicate `be` from documentation async/await compile some more stuff Upload streaming works in tests more tests In progress Remove modifying state for code that is only compiled for >=5.5 Work on async/await continues Make it compile for David save progress fix deprecation warnings for `Task.sleep(_:)` rename AsyncRequest to HTTPClientRequest rename AsyncResponse to HTTPClientResponse fix merge conflicts adopt new validation method fix compilation refactor actor into class with a lock save debug Revert "save debug" This reverts commit 60172e2. add missing locks add continuation to AsyncRequestBag.init tests still need to migrate to the new init introduce new `PreparedRequest` to make `AsyncRequestBag.init` non-throwing fix todo run swift format generate linux tests Revert "generate linux tests" This reverts commit 615c9d0. remove umrella imports use XCTAsyncTest from gRPC add #if compiler(>=5.5) && canImport(_Concurrency) remvoe duplicate XCTAsyncTest introduce new Endpoint type use Endpoint to destructure URL refactoring fix merge conflicts save progress swift cancelation handling perpare PR for HTTPClientRequest swift-format move into single file
1 parent a956e7b commit 618a81a

15 files changed

+1887
-47
lines changed
 

‎Sources/AsyncHTTPClient/AsyncAwait/AsyncRequestBag+StateMachine.swift

+570
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import Logging
17+
import NIOConcurrencyHelpers
18+
import NIOCore
19+
import NIOHTTP1
20+
import NIOSSL
21+
22+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
23+
class AsyncRequestBag {
24+
let logger: Logger
25+
// TODO: store `PreparedRequest` as a single property
26+
let request: HTTPClientRequest.Prepared
27+
var requestHead: HTTPRequestHead { self.request.head }
28+
var requestBody: HTTPClientRequest.Body? { self.request.body }
29+
var poolKey: ConnectionPool.Key { self.request.poolKey }
30+
var requestFramingMetadata: RequestFramingMetadata { self.request.requestFramingMetadata }
31+
32+
let connectionDeadline: NIODeadline
33+
let preferredEventLoop: EventLoop
34+
let requestOptions: RequestOptions
35+
36+
private let stateLock = Lock()
37+
private var state: StateMachine = .init()
38+
39+
init(
40+
request: HTTPClientRequest.Prepared,
41+
requestOptions: RequestOptions,
42+
logger: Logger,
43+
connectionDeadline: NIODeadline,
44+
preferredEventLoop: EventLoop,
45+
responseContinuation: UnsafeContinuation<HTTPClientResponse, Error>
46+
) {
47+
self.request = request
48+
self.requestOptions = requestOptions
49+
self.logger = logger
50+
self.connectionDeadline = connectionDeadline
51+
self.preferredEventLoop = preferredEventLoop
52+
53+
self.state.registerContinuation(responseContinuation)
54+
}
55+
56+
// MARK: Scheduled request
57+
58+
func cancel() {
59+
self.fail(HTTPClientError.cancelled)
60+
}
61+
62+
func requestWasQueued(_ scheduler: HTTPRequestScheduler) {
63+
self.stateLock.withLock {
64+
self.state.requestWasQueued(scheduler)
65+
}
66+
}
67+
68+
func fail(_ error: Error) {
69+
let action = self.stateLock.withLock {
70+
self.state.fail(error)
71+
}
72+
73+
switch action {
74+
case .none:
75+
break
76+
77+
case .failResponseHead(let continuation, let error, let scheduler, let executor):
78+
continuation.resume(throwing: error)
79+
scheduler?.cancelRequest(self) // NOTE: scheduler and executor are exclusive here
80+
executor?.cancelRequest(self)
81+
82+
case .failResponseStream(let continuation, let error, let executor):
83+
continuation.resume(throwing: error)
84+
executor.cancelRequest(self)
85+
}
86+
}
87+
88+
// MARK: Scheduled request
89+
90+
func willExecuteRequest(_ executor: HTTPRequestExecutor) {
91+
let action = self.stateLock.withLock {
92+
self.state.willExecuteRequest(executor)
93+
}
94+
95+
switch action {
96+
case .cancel(let executor):
97+
executor.cancelRequest(self)
98+
case .none:
99+
break
100+
}
101+
}
102+
103+
func resumeRequestBodyStream() {
104+
let action = self.stateLock.withLock {
105+
self.state.resumeRequestBodyStream()
106+
}
107+
108+
switch action {
109+
case .none:
110+
break
111+
case .resumeStream(let allocator):
112+
switch self.requestBody?.mode {
113+
case .asyncSequence(_, let next):
114+
// it is safe to call this async here. it dispatches...
115+
self.continueRequestBodyStream(allocator, next: next)
116+
117+
case .byteBuffer(let byteBuffer):
118+
self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer)
119+
120+
case .none:
121+
break
122+
123+
case .sequence(_, let create):
124+
let byteBuffer = create(allocator)
125+
self.writeOnceAndOneTimeOnly(byteBuffer: byteBuffer)
126+
}
127+
}
128+
}
129+
130+
private func writeOnceAndOneTimeOnly(byteBuffer: ByteBuffer) {
131+
#warning("TODO: @fabianfett")
132+
let writeAction = self.stateLock.withLock {
133+
self.state.producedNextRequestPart(byteBuffer)
134+
}
135+
guard case .write(let part, let executor, true) = writeAction else {
136+
preconditionFailure("")
137+
}
138+
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
139+
140+
let finishAction = self.stateLock.withLock {
141+
self.state.finishRequestBodyStream()
142+
}
143+
144+
guard case .forwardStreamFinished(let executor) = finishAction else {
145+
preconditionFailure("")
146+
}
147+
executor.finishRequestBodyStream(self)
148+
}
149+
150+
enum AfterNextBodyPartAction {
151+
case `continue`
152+
case pause
153+
}
154+
155+
private func requestBodyStreamNextPart(_ part: ByteBuffer) -> AfterNextBodyPartAction {
156+
let writeAction = self.stateLock.withLock {
157+
self.state.producedNextRequestPart(part)
158+
}
159+
160+
switch writeAction {
161+
case .write(let part, let executor, let continueAfter):
162+
executor.writeRequestBodyPart(.byteBuffer(part), request: self)
163+
if continueAfter {
164+
return .continue
165+
} else {
166+
return .pause
167+
}
168+
169+
case .ignore:
170+
// we only ignore reads, if the request has failed anyway. we should leave
171+
// the reader loop
172+
return .pause
173+
}
174+
}
175+
176+
private func requestBodyStreamFinished() {
177+
let finishAction = self.stateLock.withLock {
178+
self.state.finishRequestBodyStream()
179+
}
180+
// no more data to produce
181+
switch finishAction {
182+
case .none:
183+
break
184+
case .forwardStreamFinished(let executor):
185+
executor.finishRequestBodyStream(self)
186+
}
187+
return
188+
}
189+
190+
private func requestBodyStreamFailed(_ error: Error) {
191+
let failAction = self.stateLock.withLock {
192+
self.state.failedToProduceNextRequestPart(error)
193+
}
194+
195+
switch failAction {
196+
case .none:
197+
break
198+
case .informRequestAboutFailure(let error, cancelExecutor: let executor, let continuation):
199+
executor.cancelRequest(self)
200+
self.fail(error)
201+
continuation?.resume(throwing: error)
202+
}
203+
}
204+
205+
func pauseRequestBodyStream() {
206+
self.stateLock.withLock {
207+
self.state.pauseRequestBodyStream()
208+
}
209+
}
210+
211+
func receiveResponseHead(_ head: HTTPResponseHead) {
212+
let action = self.stateLock.withLock {
213+
self.state.receiveResponseHead(head)
214+
}
215+
switch action {
216+
case .none:
217+
break
218+
case .succeedResponseHead(let head, let continuation):
219+
let asyncResponse = HTTPClientResponse(
220+
bag: self,
221+
version: head.version,
222+
status: head.status,
223+
headers: head.headers
224+
)
225+
continuation.resume(returning: asyncResponse)
226+
}
227+
}
228+
229+
func receiveResponseBodyParts(_ buffer: CircularBuffer<ByteBuffer>) {
230+
let action = self.stateLock.withLock {
231+
self.state.receiveResponseBodyParts(buffer)
232+
}
233+
switch action {
234+
case .none:
235+
break
236+
case .succeedContinuation(let continuation, let bytes):
237+
continuation.resume(returning: bytes)
238+
}
239+
}
240+
241+
func succeedRequest(_ buffer: CircularBuffer<ByteBuffer>?) {
242+
let succeedAction = self.stateLock.withLock {
243+
self.state.succeedRequest(buffer)
244+
}
245+
switch succeedAction {
246+
case .succeedRequest(let continuation):
247+
continuation.resume(returning: nil)
248+
case .succeedContinuation(let continuation, let byteBuffer):
249+
continuation.resume(returning: byteBuffer)
250+
case .none:
251+
break
252+
}
253+
}
254+
255+
// MARK: Other methods
256+
257+
private func continueRequestBodyStream(
258+
_ allocator: ByteBufferAllocator,
259+
next: @escaping ((ByteBufferAllocator) async throws -> ByteBuffer?)
260+
) {
261+
Task {
262+
while true {
263+
do {
264+
guard let part = try await next(allocator) else { // <---- dispatch point!
265+
return self.requestBodyStreamFinished()
266+
}
267+
268+
switch self.requestBodyStreamNextPart(part) {
269+
case .pause:
270+
return
271+
case .continue:
272+
continue
273+
}
274+
275+
} catch {
276+
// producing more failed
277+
self.requestBodyStreamFailed(error)
278+
return
279+
}
280+
}
281+
}
282+
}
283+
}
284+
285+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
286+
extension AsyncRequestBag: HTTPSchedulableRequest {
287+
var tlsConfiguration: TLSConfiguration? {
288+
return nil
289+
}
290+
291+
var requiredEventLoop: EventLoop? {
292+
return nil
293+
}
294+
}
295+
296+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
297+
extension AsyncRequestBag: HTTPExecutableRequest {
298+
func requestHeadSent() {}
299+
}
300+
301+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
302+
extension AsyncRequestBag {
303+
func nextResponsePart(streamID: HTTPClientResponse.Body.IteratorStream.ID) async throws -> ByteBuffer? {
304+
try await withUnsafeThrowingContinuation { continuation in
305+
let action = self.stateLock.withLock {
306+
self.state.consumeNextResponsePart(streamID: streamID, continuation: continuation)
307+
}
308+
switch action {
309+
case .succeedContinuation(let continuation, let result):
310+
continuation.resume(returning: result)
311+
case .failContinuation(let continuation, let error):
312+
continuation.resume(throwing: error)
313+
case .askExecutorForMore(let executor):
314+
executor.demandResponseBodyStream(self)
315+
}
316+
}
317+
}
318+
319+
func cancelResponseStream(streamID: HTTPClientResponse.Body.IteratorStream.ID) {
320+
self.cancel()
321+
}
322+
}
323+
324+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import Logging
17+
import NIOCore
18+
19+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
20+
extension HTTPClient {
21+
func execute(_ request: HTTPClientRequest, deadline: NIODeadline, logger: Logger) async throws -> HTTPClientResponse {
22+
actor SwiftCancellationHandlingIs🤔 {
23+
enum State {
24+
case initialized
25+
case register(AsyncRequestBag)
26+
case cancelled
27+
}
28+
29+
private var state: State = .initialized
30+
31+
init() {}
32+
33+
func registerRequestBag(_ bag: AsyncRequestBag) {
34+
switch self.state {
35+
case .initialized:
36+
self.state = .register(bag)
37+
case .cancelled:
38+
bag.cancel()
39+
case .register:
40+
preconditionFailure()
41+
}
42+
}
43+
44+
func cancel() {
45+
switch self.state {
46+
case .register(let bag):
47+
self.state = .cancelled
48+
bag.cancel()
49+
case .cancelled:
50+
break
51+
case .initialized:
52+
self.state = .cancelled
53+
}
54+
}
55+
}
56+
let preparedRequest = try request.prepared()
57+
58+
let cancelHandler = SwiftCancellationHandlingIs🤔()
59+
60+
return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in
61+
try await withUnsafeThrowingContinuation {
62+
(continuation: UnsafeContinuation<HTTPClientResponse, Swift.Error>) -> Void in
63+
let bag = AsyncRequestBag(
64+
request: preparedRequest,
65+
requestOptions: .init(idleReadTimeout: nil),
66+
logger: logger,
67+
connectionDeadline: .now() + .seconds(10),
68+
preferredEventLoop: self.eventLoopGroup.next(),
69+
responseContinuation: continuation
70+
)
71+
72+
_Concurrency.Task {
73+
await cancelHandler.registerRequestBag(bag)
74+
}
75+
76+
self.poolManager.executeRequest(bag)
77+
}
78+
}, onCancel: {
79+
_Concurrency.Task {
80+
await cancelHandler.cancel()
81+
}
82+
})
83+
}
84+
}
85+
86+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import NIOHTTP1
17+
18+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
19+
extension HTTPClientRequest {
20+
struct Prepared {
21+
let poolKey: ConnectionPool.Key
22+
let requestFramingMetadata: RequestFramingMetadata
23+
let head: HTTPRequestHead
24+
let body: Body?
25+
}
26+
}
27+
28+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
29+
extension HTTPClientRequest {
30+
func prepared() throws -> Prepared {
31+
let url = try DeconstructedURL(url: self.url)
32+
33+
var headers = self.headers
34+
headers.addHostIfNeeded(for: url)
35+
let metadata = try headers.validateAndSetTransportFraming(
36+
method: self.method,
37+
bodyLength: .init(self.body)
38+
)
39+
40+
return .init(
41+
poolKey: .init(url: url, tlsConfiguration: nil),
42+
requestFramingMetadata: metadata,
43+
head: .init(
44+
version: .http1_1,
45+
method: self.method,
46+
uri: url.uri,
47+
headers: headers
48+
),
49+
body: self.body
50+
)
51+
}
52+
}
53+
54+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
55+
extension RequestBodyLength {
56+
init(_ body: HTTPClientRequest.Body?) {
57+
switch body?.mode {
58+
case .none:
59+
self = .fixed(length: 0)
60+
case .byteBuffer(let buffer):
61+
self = .fixed(length: buffer.readableBytes)
62+
case .sequence(nil, _), .asyncSequence(nil, _):
63+
self = .dynamic
64+
case .sequence(.some(let length), _), .asyncSequence(.some(let length), _):
65+
self = .fixed(length: length)
66+
}
67+
}
68+
}
69+
70+
#endif
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2021 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
#if compiler(>=5.5) && canImport(_Concurrency)
16+
import NIOCore
17+
import NIOHTTP1
18+
19+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
20+
struct HTTPClientResponse {
21+
var version: HTTPVersion
22+
var status: HTTPResponseStatus
23+
var headers: HTTPHeaders
24+
var body: Body
25+
26+
struct Body {
27+
private let bag: AsyncRequestBag
28+
29+
fileprivate init(_ bag: AsyncRequestBag) {
30+
self.bag = bag
31+
}
32+
}
33+
34+
init(
35+
bag: AsyncRequestBag,
36+
version: HTTPVersion,
37+
status: HTTPResponseStatus,
38+
headers: HTTPHeaders
39+
) {
40+
self.body = .init(bag)
41+
self.version = version
42+
self.status = status
43+
self.headers = headers
44+
}
45+
}
46+
47+
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
48+
extension HTTPClientResponse.Body: AsyncSequence {
49+
typealias Element = ByteBuffer
50+
typealias AsyncIterator = Iterator
51+
52+
struct Iterator: AsyncIteratorProtocol {
53+
typealias Element = ByteBuffer
54+
55+
private let stream: IteratorStream
56+
57+
fileprivate init(stream: IteratorStream) {
58+
self.stream = stream
59+
}
60+
61+
func next() async throws -> ByteBuffer? {
62+
try await self.stream.next()
63+
}
64+
}
65+
66+
func makeAsyncIterator() -> Iterator {
67+
Iterator(stream: IteratorStream(bag: self.bag))
68+
}
69+
70+
internal class IteratorStream {
71+
struct ID: Hashable {
72+
private let objectID: ObjectIdentifier
73+
74+
init(_ object: IteratorStream) {
75+
self.objectID = ObjectIdentifier(object)
76+
}
77+
}
78+
79+
var id: ID { ID(self) }
80+
private let bag: AsyncRequestBag
81+
82+
init(bag: AsyncRequestBag) {
83+
self.bag = bag
84+
}
85+
86+
deinit {
87+
self.bag.cancelResponseStream(streamID: self.id)
88+
}
89+
90+
func next() async throws -> ByteBuffer? {
91+
try await self.bag.nextResponsePart(streamID: self.id)
92+
}
93+
}
94+
}
95+
96+
#endif

‎Sources/AsyncHTTPClient/ConnectionPool.swift

+29-6
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15+
import NIOSSL
16+
1517
enum ConnectionPool {
1618
/// Used by the `ConnectionPool` to index its `HTTP1ConnectionProvider`s
1719
///
@@ -23,12 +25,14 @@ enum ConnectionPool {
2325
var connectionTarget: ConnectionTarget
2426
private var tlsConfiguration: BestEffortHashableTLSConfiguration?
2527

26-
init(_ request: HTTPClient.Request) {
27-
self.scheme = request.deconstructedURL.scheme
28-
self.connectionTarget = request.deconstructedURL.connectionTarget
29-
if let tls = request.tlsConfiguration {
30-
self.tlsConfiguration = BestEffortHashableTLSConfiguration(wrapping: tls)
31-
}
28+
init(
29+
scheme: Scheme,
30+
connectionTarget: ConnectionTarget,
31+
tlsConfiguration: BestEffortHashableTLSConfiguration? = nil
32+
) {
33+
self.scheme = scheme
34+
self.connectionTarget = connectionTarget
35+
self.tlsConfiguration = tlsConfiguration
3236
}
3337

3438
var description: String {
@@ -48,3 +52,22 @@ enum ConnectionPool {
4852
}
4953
}
5054
}
55+
56+
extension ConnectionPool.Key {
57+
init(url: DeconstructedURL, tlsConfiguration: TLSConfiguration?) {
58+
self.init(
59+
scheme: url.scheme,
60+
connectionTarget: url.connectionTarget,
61+
tlsConfiguration: tlsConfiguration.map {
62+
BestEffortHashableTLSConfiguration(wrapping: $0)
63+
}
64+
)
65+
}
66+
67+
init(_ request: HTTPClient.Request) {
68+
self.init(
69+
url: request.deconstructedURL,
70+
tlsConfiguration: request.tlsConfiguration
71+
)
72+
}
73+
}

‎Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+13-3
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,24 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
148148
case .sendRequestHead(let head, let startBody):
149149
if startBody {
150150
context.writeAndFlush(self.wrapOutboundOut(.head(head)), promise: nil)
151-
self.request!.requestHeadSent()
152-
self.request!.resumeRequestBodyStream()
151+
152+
// Writing the header might lead to errors. For this reason, we need to check, if
153+
// the request is still present. It might have been removed, because the request was
154+
// already failed.
155+
if let request = self.request {
156+
request.requestHeadSent()
157+
request.resumeRequestBodyStream()
158+
}
159+
153160
} else {
154161
context.write(self.wrapOutboundOut(.head(head)), promise: nil)
155162
context.write(self.wrapOutboundOut(.end(nil)), promise: nil)
156163
context.flush()
157164

158-
self.request!.requestHeadSent()
165+
// Writing the header might lead to errors. For this reason, we need to check, if
166+
// the request is still present. It might have been removed, because the request was
167+
// already failed.
168+
self.request?.requestHeadSent()
159169

160170
if let timeoutAction = self.idleReadTimeoutStateMachine?.requestEndSent() {
161171
self.runTimeoutAction(timeoutAction, context: context)

‎Sources/AsyncHTTPClient/ConnectionTarget.swift

+22
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,25 @@ enum ConnectionTarget: Equatable, Hashable {
4040
}
4141
}
4242
}
43+
44+
extension ConnectionTarget {
45+
/// The host name which will be send as an HTTP `Host` header.
46+
/// Only returns nil if the `self` is a `unixSocket`.
47+
var host: String? {
48+
switch self {
49+
case .ipAddress(let serialization, _): return serialization
50+
case .domain(let name, _): return name
51+
case .unixSocket: return nil
52+
}
53+
}
54+
55+
/// The host name which will be send as an HTTP host header.
56+
/// Only returns nil if the `self` is a `unixSocket`.
57+
var port: Int? {
58+
switch self {
59+
case .ipAddress(_, let address): return address.port!
60+
case .domain(_, let port): return port
61+
case .unixSocket: return nil
62+
}
63+
}
64+
}

‎Sources/AsyncHTTPClient/DeconstructedURL.swift

+7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,13 @@ struct DeconstructedURL {
3131
}
3232

3333
extension DeconstructedURL {
34+
init(url: String) throws {
35+
guard let url = URL(string: url) else {
36+
throw HTTPClientError.invalidURL
37+
}
38+
try self.init(url: url)
39+
}
40+
3441
init(url: URL) throws {
3542
guard let schemeString = url.scheme else {
3643
throw HTTPClientError.emptyScheme

‎Sources/AsyncHTTPClient/HTTPHandler.swift

+3-18
Original file line numberDiff line numberDiff line change
@@ -200,20 +200,12 @@ extension HTTPClient {
200200

201201
/// Remote host, resolved from `URL`.
202202
public var host: String {
203-
switch self.deconstructedURL.connectionTarget {
204-
case .ipAddress(let serialization, _): return serialization
205-
case .domain(let name, _): return name
206-
case .unixSocket: return ""
207-
}
203+
self.deconstructedURL.connectionTarget.host ?? ""
208204
}
209205

210206
/// Resolved port.
211207
public var port: Int {
212-
switch self.deconstructedURL.connectionTarget {
213-
case .ipAddress(_, let address): return address.port!
214-
case .domain(_, let port): return port
215-
case .unixSocket: return self.deconstructedURL.scheme.defaultPort
216-
}
208+
self.deconstructedURL.connectionTarget.port ?? self.deconstructedURL.scheme.defaultPort
217209
}
218210

219211
/// Whether request will be executed using secure socket.
@@ -227,14 +219,7 @@ extension HTTPClient {
227219
headers: self.headers
228220
)
229221

230-
if !head.headers.contains(name: "host") {
231-
let port = self.port
232-
var host = self.host
233-
if !(port == 80 && self.deconstructedURL.scheme == .http), !(port == 443 && self.deconstructedURL.scheme == .https) {
234-
host += ":\(port)"
235-
}
236-
head.headers.add(name: "host", value: host)
237-
}
222+
head.headers.addHostIfNeeded(for: self.deconstructedURL)
238223

239224
let metadata = try head.headers.validateAndSetTransportFraming(method: self.method, bodyLength: .init(self.body))
240225

‎Sources/AsyncHTTPClient/RequestValidation.swift

+17
Original file line numberDiff line numberDiff line change
@@ -110,3 +110,20 @@ extension HTTPHeaders {
110110
}
111111
}
112112
}
113+
114+
extension HTTPHeaders {
115+
mutating func addHostIfNeeded(for url: DeconstructedURL) {
116+
// if no host header was set, let's use the url host
117+
guard !self.contains(name: "host"),
118+
var host = url.connectionTarget.host
119+
else {
120+
return
121+
}
122+
// if the request uses a non-default port, we need to add it after the host
123+
if let port = url.connectionTarget.port,
124+
port != url.scheme.defaultPort {
125+
host += ":\(port)"
126+
}
127+
self.add(name: "host", value: host)
128+
}
129+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the AsyncHTTPClient open source project
4+
//
5+
// Copyright (c) 2018-2019 Apple Inc. and the AsyncHTTPClient project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of AsyncHTTPClient project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
//
15+
// AsyncRequestTests+XCTest.swift
16+
//
17+
import XCTest
18+
19+
///
20+
/// NOTE: This file was generated by generate_linux_tests.rb
21+
///
22+
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
23+
///
24+
25+
extension AsyncRequestTests {
26+
static var allTests: [(String, (AsyncRequestTests) -> () throws -> Void)] {
27+
return [
28+
("testCancelAsyncRequest", testCancelAsyncRequest),
29+
("testResponseStreamingWorks", testResponseStreamingWorks),
30+
("testWriteBackpressureWorks", testWriteBackpressureWorks),
31+
("testSimpleGetRequest", testSimpleGetRequest),
32+
("testBiDirectionalStreamingHTTP2", testBiDirectionalStreamingHTTP2),
33+
]
34+
}
35+
}

‎Tests/AsyncHTTPClientTests/AsyncRequestTests.swift

+536
Large diffs are not rendered by default.

‎Tests/AsyncHTTPClientTests/RequestBagTests.swift

+78-20
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
@testable import AsyncHTTPClient
1616
import Logging
17+
import NIOConcurrencyHelpers
1718
import NIOCore
1819
import NIOEmbedded
1920
import NIOHTTP1
@@ -70,7 +71,10 @@ final class RequestBagTests: XCTestCase {
7071

7172
XCTAssert(bag.task.eventLoop === embeddedEventLoop)
7273

73-
let executor = MockRequestExecutor(pauseRequestBodyPartStreamAfterASingleWrite: true)
74+
let executor = MockRequestExecutor(
75+
pauseRequestBodyPartStreamAfterASingleWrite: true,
76+
eventLoop: embeddedEventLoop
77+
)
7478

7579
bag.willExecuteRequest(executor)
7680

@@ -99,7 +103,7 @@ final class RequestBagTests: XCTestCase {
99103
streamIsAllowedToWrite = true
100104
bag.resumeRequestBodyStream()
101105
streamIsAllowedToWrite = false
102-
XCTAssertLessThanOrEqual(executor.requestBodyParts.count, 2)
106+
XCTAssertLessThanOrEqual(executor.requestBodyPartsCount, 2)
103107
XCTAssertEqual(delegate.hitDidSendRequestPart, writes)
104108
}
105109
}
@@ -110,6 +114,7 @@ final class RequestBagTests: XCTestCase {
110114
let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: .init([
111115
("Transfer-Encoding", "chunked"),
112116
]))
117+
XCTAssertFalse(executor.signalledDemandForResponseBody)
113118
bag.receiveResponseHead(responseHead)
114119
XCTAssertEqual(responseHead, delegate.receivedHead)
115120
XCTAssertNoThrow(try XCTUnwrap(delegate.backpressurePromise).succeed(()))
@@ -178,7 +183,7 @@ final class RequestBagTests: XCTestCase {
178183
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
179184
XCTAssert(bag.task.eventLoop === embeddedEventLoop)
180185

181-
let executor = MockRequestExecutor()
186+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
182187

183188
bag.willExecuteRequest(executor)
184189

@@ -221,7 +226,7 @@ final class RequestBagTests: XCTestCase {
221226
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
222227
XCTAssert(bag.eventLoop === embeddedEventLoop)
223228

224-
let executor = MockRequestExecutor()
229+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
225230
bag.cancel()
226231

227232
bag.willExecuteRequest(executor)
@@ -254,7 +259,7 @@ final class RequestBagTests: XCTestCase {
254259
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
255260
XCTAssert(bag.eventLoop === embeddedEventLoop)
256261

257-
let executor = MockRequestExecutor()
262+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
258263

259264
bag.willExecuteRequest(executor)
260265
XCTAssertFalse(executor.isCancelled)
@@ -329,7 +334,7 @@ final class RequestBagTests: XCTestCase {
329334
))
330335
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
331336

332-
let executor = MockRequestExecutor()
337+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
333338
bag.willExecuteRequest(executor)
334339
bag.requestHeadSent()
335340
bag.receiveResponseHead(.init(version: .http1_1, status: .ok))
@@ -386,7 +391,7 @@ final class RequestBagTests: XCTestCase {
386391
))
387392
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
388393

389-
let executor = MockRequestExecutor()
394+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
390395
bag.willExecuteRequest(executor)
391396

392397
XCTAssertEqual(delegate.hitDidSendRequestHead, 0)
@@ -431,7 +436,7 @@ final class RequestBagTests: XCTestCase {
431436
))
432437
guard let bag = maybeRequestBag else { return XCTFail("Expected to be able to create a request bag.") }
433438

434-
let executor = MockRequestExecutor()
439+
let executor = MockRequestExecutor(eventLoop: embeddedEventLoop)
435440
bag.willExecuteRequest(executor)
436441
bag.requestHeadSent()
437442
bag.receiveResponseHead(.init(version: .http1_1, status: .ok))
@@ -472,45 +477,98 @@ class MockRequestExecutor: HTTPRequestExecutor {
472477
case endOfStream
473478
}
474479

480+
let eventLoop: EventLoop
481+
let lock = Lock()
475482
let pauseRequestBodyPartStreamAfterASingleWrite: Bool
476483

477-
private(set) var requestBodyParts = CircularBuffer<RequestParts>()
478-
private(set) var isCancelled: Bool = false
479-
private(set) var signalledDemandForResponseBody: Bool = false
484+
var isCancelled: Bool {
485+
self.lock.withLock { self._isCancelled }
486+
}
487+
488+
var signalledDemandForResponseBody: Bool {
489+
self.lock.withLock { self._signaledDemandForResponseBody }
490+
}
491+
492+
var requestBodyPartsCount: Int {
493+
self.lock.withLock { self._requestBodyParts.count }
494+
}
495+
496+
private(set) var _requestBodyParts = CircularBuffer<RequestParts>()
497+
private(set) var _isCancelled: Bool = false
498+
private(set) var _signaledDemandForResponseBody: Bool = false
499+
private(set) var _readable: EventLoopPromise<Void>?
480500

481-
init(pauseRequestBodyPartStreamAfterASingleWrite: Bool = false) {
501+
init(pauseRequestBodyPartStreamAfterASingleWrite: Bool = false, eventLoop: EventLoop) {
482502
self.pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite
503+
self.eventLoop = eventLoop
483504
}
484505

485506
func nextBodyPart() -> RequestParts? {
486-
guard !self.requestBodyParts.isEmpty else { return nil }
487-
return self.requestBodyParts.removeFirst()
507+
self.lock.withLock { () -> RequestParts? in
508+
guard !self._requestBodyParts.isEmpty else { return nil }
509+
return self._requestBodyParts.removeFirst()
510+
}
488511
}
489512

490513
func resetDemandSignal() {
491-
self.signalledDemandForResponseBody = false
514+
self.lock.withLockVoid {
515+
self._signaledDemandForResponseBody = false
516+
}
517+
}
518+
519+
func readable() -> EventLoopFuture<Void> {
520+
self.lock.withLock { () -> EventLoopFuture<Void> in
521+
if !self._requestBodyParts.isEmpty {
522+
return self.eventLoop.makeSucceededVoidFuture()
523+
}
524+
525+
let promise = self.eventLoop.makePromise(of: Void.self)
526+
self._readable = promise
527+
return promise.futureResult
528+
}
492529
}
493530

494531
// this should always be called twice. When we receive the first call, the next call to produce
495532
// data is already scheduled. If we call pause here, once, after the second call new subsequent
496533
// calls should not be scheduled.
497534
func writeRequestBodyPart(_ part: IOData, request: HTTPExecutableRequest) {
498-
if self.requestBodyParts.isEmpty, self.pauseRequestBodyPartStreamAfterASingleWrite {
535+
let (pause, promise) = self.lock.withLock { () -> (Bool, EventLoopPromise<Void>?) in
536+
var pause = false
537+
if self._requestBodyParts.isEmpty, self.pauseRequestBodyPartStreamAfterASingleWrite {
538+
pause = true
539+
}
540+
self._requestBodyParts.append(.body(part))
541+
let promise = self._readable
542+
self._readable = nil
543+
return (pause, promise)
544+
}
545+
546+
if pause {
499547
request.pauseRequestBodyStream()
500548
}
501-
self.requestBodyParts.append(.body(part))
549+
550+
promise?.succeed(())
502551
}
503552

504553
func finishRequestBodyStream(_: HTTPExecutableRequest) {
505-
self.requestBodyParts.append(.endOfStream)
554+
let promise = self.lock.withLock { () -> EventLoopPromise<Void>? in
555+
self._requestBodyParts.append(.endOfStream)
556+
let promise = self._readable
557+
self._readable = nil
558+
return promise
559+
}
560+
561+
promise?.succeed(())
506562
}
507563

508564
func demandResponseBodyStream(_: HTTPExecutableRequest) {
509-
self.signalledDemandForResponseBody = true
565+
self.lock.withLockVoid {
566+
self._signaledDemandForResponseBody = true
567+
}
510568
}
511569

512570
func cancelRequest(_: HTTPExecutableRequest) {
513-
self.isCancelled = true
571+
self._isCancelled = true
514572
}
515573
}
516574

‎Tests/LinuxMain.swift

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import XCTest
2626
@testable import AsyncHTTPClientTests
2727

2828
XCTMain([
29+
testCase(AsyncRequestTests.allTests),
2930
testCase(HTTP1ClientChannelHandlerTests.allTests),
3031
testCase(HTTP1ConnectionStateMachineTests.allTests),
3132
testCase(HTTP1ConnectionTests.allTests),

0 commit comments

Comments
 (0)
Please sign in to comment.