Skip to content

Commit d952776

Browse files
authored
Respect deadline on new HTTPClient.execute for async/await (#529)
* Schedule deadline timeout * Add state machine tests and enable skipped test for http1
1 parent 239d6d2 commit d952776

7 files changed

+233
-24
lines changed

Sources/AsyncHTTPClient/AsyncAwait/HTTPClient+execute.swift

+36-13
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,20 @@ extension HTTPClient {
9494
let cancelHandler = TransactionCancelHandler()
9595

9696
return try await withTaskCancellationHandler(operation: { () async throws -> HTTPClientResponse in
97-
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
97+
let eventLoop = self.eventLoopGroup.any()
98+
let deadlineTask = eventLoop.scheduleTask(deadline: deadline) {
99+
cancelHandler.cancel(reason: .deadlineExceeded)
100+
}
101+
defer {
102+
deadlineTask.cancel()
103+
}
104+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<HTTPClientResponse, Swift.Error>) -> Void in
98105
let transaction = Transaction(
99106
request: request,
100107
requestOptions: .init(idleReadTimeout: nil),
101108
logger: logger,
102109
connectionDeadline: deadline,
103-
preferredEventLoop: self.eventLoopGroup.next(),
110+
preferredEventLoop: eventLoop,
104111
responseContinuation: continuation
105112
)
106113

@@ -109,7 +116,7 @@ extension HTTPClient {
109116
self.poolManager.executeRequest(transaction)
110117
}
111118
}, onCancel: {
112-
cancelHandler.cancel()
119+
cancelHandler.cancel(reason: .taskCanceled)
113120
})
114121
}
115122
}
@@ -119,22 +126,38 @@ extension HTTPClient {
119126
/// in the `body` closure and cancelation from the `onCancel` closure of `withTaskCancellationHandler`.
120127
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)
121128
private actor TransactionCancelHandler {
129+
enum CancelReason {
130+
/// swift concurrency task was canceled
131+
case taskCanceled
132+
/// deadline timeout
133+
case deadlineExceeded
134+
}
135+
122136
private enum State {
123137
case initialised
124138
case register(Transaction)
125-
case cancelled
139+
case cancelled(CancelReason)
126140
}
127141

128142
private var state: State = .initialised
129143

130144
init() {}
131145

146+
private func cancelTransaction(_ transaction: Transaction, for reason: CancelReason) {
147+
switch reason {
148+
case .taskCanceled:
149+
transaction.cancel()
150+
case .deadlineExceeded:
151+
transaction.deadlineExceeded()
152+
}
153+
}
154+
132155
private func _registerTransaction(_ transaction: Transaction) {
133156
switch self.state {
134157
case .initialised:
135158
self.state = .register(transaction)
136-
case .cancelled:
137-
transaction.cancel()
159+
case .cancelled(let reason):
160+
self.cancelTransaction(transaction, for: reason)
138161
case .register:
139162
preconditionFailure("transaction already set")
140163
}
@@ -146,21 +169,21 @@ private actor TransactionCancelHandler {
146169
}
147170
}
148171

149-
private func _cancel() {
172+
private func _cancel(reason: CancelReason) {
150173
switch self.state {
151-
case .register(let bag):
152-
self.state = .cancelled
153-
bag.cancel()
174+
case .register(let transaction):
175+
self.state = .cancelled(reason)
176+
self.cancelTransaction(transaction, for: reason)
154177
case .cancelled:
155178
break
156179
case .initialised:
157-
self.state = .cancelled
180+
self.state = .cancelled(reason)
158181
}
159182
}
160183

161-
nonisolated func cancel() {
184+
nonisolated func cancel(reason: CancelReason) {
162185
Task {
163-
await self._cancel()
186+
await self._cancel(reason: reason)
164187
}
165188
}
166189
}

Sources/AsyncHTTPClient/AsyncAwait/Transaction+StateMachine.swift

+61
Original file line numberDiff line numberDiff line change
@@ -674,6 +674,67 @@ extension Transaction {
674674
preconditionFailure("Already received an eof or error before. Must not receive further events. Invalid state: \(self.state)")
675675
}
676676
}
677+
678+
enum DeadlineExceededAction {
679+
case none
680+
/// fail response before head received. scheduler and executor are exclusive here.
681+
case cancel(
682+
requestContinuation: CheckedContinuation<HTTPClientResponse, Error>,
683+
scheduler: HTTPRequestScheduler?,
684+
executor: HTTPRequestExecutor?,
685+
bodyStreamContinuation: CheckedContinuation<Void, Error>?
686+
)
687+
}
688+
689+
mutating func deadlineExceeded() -> DeadlineExceededAction {
690+
let error = HTTPClientError.deadlineExceeded
691+
switch self.state {
692+
case .initialized(let continuation):
693+
self.state = .finished(error: error, nil)
694+
return .cancel(
695+
requestContinuation: continuation,
696+
scheduler: nil,
697+
executor: nil,
698+
bodyStreamContinuation: nil
699+
)
700+
701+
case .queued(let continuation, let scheduler):
702+
self.state = .finished(error: error, nil)
703+
return .cancel(
704+
requestContinuation: continuation,
705+
scheduler: scheduler,
706+
executor: nil,
707+
bodyStreamContinuation: nil
708+
)
709+
710+
case .executing(let context, let requestStreamState, .waitingForResponseHead):
711+
switch requestStreamState {
712+
case .paused(continuation: .some(let continuation)):
713+
self.state = .finished(error: error, nil)
714+
return .cancel(
715+
requestContinuation: context.continuation,
716+
scheduler: nil,
717+
executor: context.executor,
718+
bodyStreamContinuation: continuation
719+
)
720+
case .requestHeadSent, .finished, .producing, .paused(continuation: .none):
721+
self.state = .finished(error: error, nil)
722+
return .cancel(
723+
requestContinuation: context.continuation,
724+
scheduler: nil,
725+
executor: context.executor,
726+
bodyStreamContinuation: nil
727+
)
728+
}
729+
730+
case .executing, .finished:
731+
// The user specified deadline is only used until we received the response head.
732+
// If we already received the head, we have also resumed the continuation and
733+
// therefore return the HTTPClientResponse to the user. We do not want to cancel
734+
// the request body streaming nor the response body streaming afterwards.
735+
return .none
736+
}
737+
}
677738
}
678739
}
679740

Sources/AsyncHTTPClient/AsyncAwait/Transaction.swift

+20
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,26 @@ extension Transaction: HTTPExecutableRequest {
294294
bodyStreamContinuation.resume(throwing: error)
295295
}
296296
}
297+
298+
func deadlineExceeded() {
299+
let action = self.stateLock.withLock {
300+
self.state.deadlineExceeded()
301+
}
302+
self.performDeadlineExceededAction(action)
303+
}
304+
305+
private func performDeadlineExceededAction(_ action: StateMachine.DeadlineExceededAction) {
306+
switch action {
307+
case .cancel(let requestContinuation, let scheduler, let executor, let bodyStreamContinuation):
308+
requestContinuation.resume(throwing: HTTPClientError.deadlineExceeded)
309+
scheduler?.cancelRequest(self)
310+
executor?.cancelRequest(self)
311+
bodyStreamContinuation?.resume(throwing: HTTPClientError.deadlineExceeded)
312+
313+
case .none:
314+
break
315+
}
316+
}
297317
}
298318

299319
@available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *)

Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests+XCTest.swift

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ extension AsyncAwaitEndToEndTests {
3737
("testPostWithFragmentedAsyncSequenceOfLargeByteBuffers", testPostWithFragmentedAsyncSequenceOfLargeByteBuffers),
3838
("testCanceling", testCanceling),
3939
("testDeadline", testDeadline),
40+
("testImmediateDeadline", testImmediateDeadline),
4041
("testInvalidURL", testInvalidURL),
4142
]
4243
}

Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift

+30-11
Original file line numberDiff line numberDiff line change
@@ -318,18 +318,16 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
318318
#endif
319319
}
320320

321-
func testCanceling() throws {
322-
#if os(Linux)
323-
#else
324-
try XCTSkipIf(true, "test times out because of a swift concurrency bug on macOS: https://bugs.swift.org/browse/SR-15592")
325-
#endif
321+
func testCanceling() {
326322
#if compiler(>=5.5) && canImport(_Concurrency)
327323
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
328324
XCTAsyncTest(timeout: 5) {
325+
let bin = HTTPBin()
326+
defer { XCTAssertNoThrow(try bin.shutdown()) }
329327
let client = makeDefaultHTTPClient()
330328
defer { XCTAssertNoThrow(try client.syncShutdown()) }
331329
let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))
332-
var request = HTTPClientRequest(url: "https://localhost:45678/offline")
330+
var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/offline")
333331
request.method = .POST
334332
let streamWriter = AsyncSequenceWriter<ByteBuffer>()
335333
request.body = .stream(length: nil, streamWriter)
@@ -345,8 +343,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
345343
#endif
346344
}
347345

348-
func testDeadline() throws {
349-
try XCTSkipIf(true, "deadline is currently not correctly implemented. We only use it to timeout connection establishment. will be fixed in a follow up PR")
346+
func testDeadline() {
350347
#if compiler(>=5.5) && canImport(_Concurrency)
351348
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
352349
XCTAsyncTest(timeout: 5) {
@@ -358,11 +355,33 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
358355
let request = HTTPClientRequest(url: "https://localhost:\(bin.port)/wait")
359356

360357
let task = Task<HTTPClientResponse, Error> { [request] in
361-
try await client.execute(request, deadline: .now() + .seconds(1), logger: logger)
358+
try await client.execute(request, deadline: .now() + .milliseconds(100), logger: logger)
359+
}
360+
await XCTAssertThrowsError(try await task.value) {
361+
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded)
362+
}
363+
}
364+
#endif
365+
}
366+
367+
func testImmediateDeadline() {
368+
#if compiler(>=5.5) && canImport(_Concurrency)
369+
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
370+
XCTAsyncTest(timeout: 5) {
371+
let bin = HTTPBin()
372+
defer { XCTAssertNoThrow(try bin.shutdown()) }
373+
let client = makeDefaultHTTPClient()
374+
defer { XCTAssertNoThrow(try client.syncShutdown()) }
375+
let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:))
376+
let request = HTTPClientRequest(url: "http://localhost:\(bin.port)/wait")
377+
378+
let task = Task<HTTPClientResponse, Error> { [request] in
379+
try await client.execute(request, deadline: .now(), logger: logger)
362380
}
363381
await XCTAssertThrowsError(try await task.value) {
364-
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.readTimeout)
382+
XCTAssertEqual($0 as? HTTPClientError, HTTPClientError.deadlineExceeded)
365383
}
384+
print("done")
366385
}
367386
#endif
368387
}
@@ -386,7 +405,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase {
386405

387406
#if compiler(>=5.5) && canImport(_Concurrency)
388407
extension AsyncSequence where Element == ByteBuffer {
389-
func collect() async throws -> ByteBuffer {
408+
func collect() async rethrows -> ByteBuffer {
390409
try await self.reduce(into: ByteBuffer()) { accumulatingBuffer, nextBuffer in
391410
var nextBuffer = nextBuffer
392411
accumulatingBuffer.writeBuffer(&nextBuffer)

Tests/AsyncHTTPClientTests/Transaction+StateMachineTests+XCTest.swift

+3
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ extension Transaction_StateMachineTests {
2727
return [
2828
("testRequestWasQueuedAfterWillExecuteRequestWasCalled", testRequestWasQueuedAfterWillExecuteRequestWasCalled),
2929
("testRequestBodyStreamWasPaused", testRequestBodyStreamWasPaused),
30+
("testQueuedRequestGetsRemovedWhenDeadlineExceeded", testQueuedRequestGetsRemovedWhenDeadlineExceeded),
31+
("testScheduledRequestGetsRemovedWhenDeadlineExceeded", testScheduledRequestGetsRemovedWhenDeadlineExceeded),
32+
("testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded", testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded),
3033
]
3134
}
3235
}

Tests/AsyncHTTPClientTests/Transaction+StateMachineTests.swift

+82
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
@testable import AsyncHTTPClient
1616
import NIOCore
1717
import NIOEmbedded
18+
import NIOHTTP1
1819
import XCTest
1920

2021
final class Transaction_StateMachineTests: XCTestCase {
@@ -70,6 +71,87 @@ final class Transaction_StateMachineTests: XCTestCase {
7071
}
7172
#endif
7273
}
74+
75+
func testQueuedRequestGetsRemovedWhenDeadlineExceeded() {
76+
#if compiler(>=5.5) && canImport(_Concurrency)
77+
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
78+
XCTAsyncTest {
79+
func workaround(_ continuation: CheckedContinuation<HTTPClientResponse, Error>) {
80+
var state = Transaction.StateMachine(continuation)
81+
let queuer = MockTaskQueuer()
82+
83+
state.requestWasQueued(queuer)
84+
85+
let failAction = state.deadlineExceeded()
86+
guard case .cancel(let continuation, let scheduler, nil, nil) = failAction else {
87+
return XCTFail("Unexpected fail action: \(failAction)")
88+
}
89+
XCTAssertIdentical(scheduler as? MockTaskQueuer, queuer)
90+
91+
continuation.resume(throwing: HTTPClientError.deadlineExceeded)
92+
}
93+
94+
await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround))
95+
}
96+
#endif
97+
}
98+
99+
func testScheduledRequestGetsRemovedWhenDeadlineExceeded() {
100+
#if compiler(>=5.5) && canImport(_Concurrency)
101+
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
102+
let eventLoop = EmbeddedEventLoop()
103+
XCTAsyncTest {
104+
func workaround(_ continuation: CheckedContinuation<HTTPClientResponse, Error>) {
105+
var state = Transaction.StateMachine(continuation)
106+
let executor = MockRequestExecutor(eventLoop: eventLoop)
107+
let queuer = MockTaskQueuer()
108+
109+
XCTAssertEqual(state.willExecuteRequest(executor), .none)
110+
state.requestWasQueued(queuer)
111+
112+
let failAction = state.deadlineExceeded()
113+
guard case .cancel(let continuation, nil, let rexecutor, nil) = failAction else {
114+
return XCTFail("Unexpected fail action: \(failAction)")
115+
}
116+
XCTAssertIdentical(rexecutor as? MockRequestExecutor, executor)
117+
118+
continuation.resume(throwing: HTTPClientError.deadlineExceeded)
119+
}
120+
121+
await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround))
122+
}
123+
#endif
124+
}
125+
126+
func testRequestWithHeadReceivedGetNotCancelledWhenDeadlineExceeded() {
127+
#if compiler(>=5.5) && canImport(_Concurrency)
128+
guard #available(macOS 12.0, iOS 15.0, watchOS 8.0, tvOS 15.0, *) else { return }
129+
let eventLoop = EmbeddedEventLoop()
130+
XCTAsyncTest {
131+
func workaround(_ continuation: CheckedContinuation<HTTPClientResponse, Error>) {
132+
var state = Transaction.StateMachine(continuation)
133+
let executor = MockRequestExecutor(eventLoop: eventLoop)
134+
let queuer = MockTaskQueuer()
135+
136+
XCTAssertEqual(state.willExecuteRequest(executor), .none)
137+
state.requestWasQueued(queuer)
138+
let head = HTTPResponseHead(version: .http1_1, status: .ok)
139+
let receiveResponseHeadAction = state.receiveResponseHead(head)
140+
guard case .succeedResponseHead(head, let continuation) = receiveResponseHeadAction else {
141+
return XCTFail("Unexpected action: \(receiveResponseHeadAction)")
142+
}
143+
144+
let failAction = state.deadlineExceeded()
145+
guard case .none = failAction else {
146+
return XCTFail("Unexpected fail action: \(failAction)")
147+
}
148+
continuation.resume(throwing: HTTPClientError.deadlineExceeded)
149+
}
150+
151+
await XCTAssertThrowsError(try await withCheckedThrowingContinuation(workaround))
152+
}
153+
#endif
154+
}
73155
}
74156

75157
#if compiler(>=5.5) && canImport(_Concurrency)

0 commit comments

Comments
 (0)