diff --git a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift index 2477e1154..92fdc2b8a 100644 --- a/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift +++ b/Sources/AsyncHTTPClient/ConnectionPool/HTTPExecutableRequest.swift @@ -201,7 +201,13 @@ protocol HTTPRequestExecutor { func cancelRequest(_ task: HTTPExecutableRequest) } -protocol HTTPExecutableRequest: AnyObject { +#if swift(>=5.6) +typealias _HTTPExecutableRequestSendable = Sendable +#else +typealias _HTTPExecutableRequestSendable = Any +#endif + +protocol HTTPExecutableRequest: AnyObject, _HTTPExecutableRequestSendable { /// The request's logger var logger: Logger { get } diff --git a/Sources/AsyncHTTPClient/HTTPClient.swift b/Sources/AsyncHTTPClient/HTTPClient.swift index 9301094ef..7bfa3a5ef 100644 --- a/Sources/AsyncHTTPClient/HTTPClient.swift +++ b/Sources/AsyncHTTPClient/HTTPClient.swift @@ -1025,3 +1025,8 @@ public struct HTTPClientError: Error, Equatable, CustomStringConvertible { @available(*, deprecated, message: "AsyncHTTPClient now correctly supports informational headers. For this reason `httpEndReceivedAfterHeadWith1xx` will not be thrown anymore.") public static let httpEndReceivedAfterHeadWith1xx = HTTPClientError(code: .httpEndReceivedAfterHeadWith1xx) } + +#if swift(>=5.6) +/// HTTPClient is Sendable, since shared state is protected by the internal ``stateLock``. +extension HTTPClient: @unchecked Sendable {} +#endif diff --git a/Sources/AsyncHTTPClient/HTTPHandler.swift b/Sources/AsyncHTTPClient/HTTPHandler.swift index c1ce39632..c47d422d2 100644 --- a/Sources/AsyncHTTPClient/HTTPHandler.swift +++ b/Sources/AsyncHTTPClient/HTTPHandler.swift @@ -14,8 +14,12 @@ import Foundation import Logging -import NIOConcurrencyHelpers +#if swift(>=5.6) +@preconcurrency import NIOCore +#else import NIOCore +#endif +import NIOConcurrencyHelpers import NIOHTTP1 import NIOSSL @@ -385,6 +389,12 @@ public class ResponseAccumulator: HTTPClientResponseDelegate { } } +#if swift(>=5.6) +@preconcurrency public protocol _HTTPClientResponseDelegate: Sendable {} +#else +public protocol _HTTPClientResponseDelegate {} +#endif + /// `HTTPClientResponseDelegate` allows an implementation to receive notifications about request processing and to control how response parts are processed. /// You can implement this protocol if you need fine-grained control over an HTTP request/response, for example, if you want to inspect the response /// headers before deciding whether to accept a response body, or if you want to stream your request body. Pass an instance of your conforming @@ -414,7 +424,7 @@ public class ResponseAccumulator: HTTPClientResponseDelegate { /// released together with the `HTTPTaskHandler` when channel is closed. /// Users of the library are not required to keep a reference to the /// object that implements this protocol, but may do so if needed. -public protocol HTTPClientResponseDelegate: AnyObject { +public protocol HTTPClientResponseDelegate: AnyObject, _HTTPClientResponseDelegate { associatedtype Response /// Called when the request head is sent. Will be called once. @@ -635,6 +645,11 @@ extension HTTPClient { } } +#if swift(>=5.6) +// HTTPClient.Task is Sendable thanks to the internal lock. +extension HTTPClient.Task: @unchecked Sendable {} +#endif + internal struct TaskCancelEvent {} // MARK: - RedirectHandler diff --git a/Sources/AsyncHTTPClient/RequestBag.swift b/Sources/AsyncHTTPClient/RequestBag.swift index 9a40e9ff5..faae095cf 100644 --- a/Sources/AsyncHTTPClient/RequestBag.swift +++ b/Sources/AsyncHTTPClient/RequestBag.swift @@ -446,3 +446,8 @@ extension RequestBag: HTTPClientTaskDelegate { } } } + +#if swift(>=5.6) +// RequestBag is Sendable because everything is dispatched onto the EL. +extension RequestBag: @unchecked Sendable {} +#endif diff --git a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift index 2cd056225..610b428bc 100644 --- a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift +++ b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift @@ -13,7 +13,11 @@ //===----------------------------------------------------------------------===// @testable import AsyncHTTPClient +#if swift(>=5.6) +@preconcurrency import Logging +#else import Logging +#endif import NIOCore import NIOPosix import XCTest @@ -327,13 +331,14 @@ final class AsyncAwaitEndToEndTests: XCTestCase { let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:)) - var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/offline") - request.method = .POST - let streamWriter = AsyncSequenceWriter() - request.body = .stream(streamWriter, length: .unknown) - let task = Task { [request] in - try await client.execute(request, deadline: .now() + .seconds(2), logger: logger) + let task = Task { + var request = HTTPClientRequest(url: "http://localhost:\(bin.port)/offline") + request.method = .POST + let streamWriter = AsyncSequenceWriter() + request.body = .stream(streamWriter, length: .unknown) + + _ = try await client.execute(request, deadline: .now() + .seconds(2), logger: logger) } task.cancel() await XCTAssertThrowsError(try await task.value) { error in @@ -352,10 +357,10 @@ final class AsyncAwaitEndToEndTests: XCTestCase { let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:)) - let request = HTTPClientRequest(url: "https://localhost:\(bin.port)/wait") - let task = Task { [request] in - try await client.execute(request, deadline: .now() + .milliseconds(100), logger: logger) + let task = Task { + let request = HTTPClientRequest(url: "https://localhost:\(bin.port)/wait") + _ = try await client.execute(request, deadline: .now() + .milliseconds(100), logger: logger) } await XCTAssertThrowsError(try await task.value) { error in guard let error = error as? HTTPClientError else { @@ -377,10 +382,10 @@ final class AsyncAwaitEndToEndTests: XCTestCase { let client = makeDefaultHTTPClient() defer { XCTAssertNoThrow(try client.syncShutdown()) } let logger = Logger(label: "HTTPClient", factory: StreamLogHandler.standardOutput(label:)) - let request = HTTPClientRequest(url: "http://localhost:\(bin.port)/wait") - let task = Task { [request] in - try await client.execute(request, deadline: .now(), logger: logger) + let task = Task { + let request = HTTPClientRequest(url: "http://localhost:\(bin.port)/wait") + _ = try await client.execute(request, deadline: .now(), logger: logger) } await XCTAssertThrowsError(try await task.value) { error in guard let error = error as? HTTPClientError else { diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift index 230c91a2b..2216da4c2 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTestUtils.swift @@ -591,6 +591,10 @@ extension HTTPBin where RequestHandler == HTTPBinHandler { } } +#if swift(>=5.6) +extension HTTPBin: @unchecked Sendable {} +#endif + enum HTTPBinError: Error { case refusedConnection case invalidProxyRequest diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index f8d6044cd..26e2dffd5 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -137,3 +137,7 @@ private class MockScheduledRequest: HTTPSchedulableRequest { preconditionFailure("Unimplemented") } } + +#if swift(>=5.6) +extension MockScheduledRequest: @unchecked Sendable {} +#endif diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift index eedc499ad..923050d91 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockConnectionPool.swift @@ -14,7 +14,11 @@ @testable import AsyncHTTPClient import Logging +#if swift(>=5.6) +@preconcurrency import NIOCore +#else import NIOCore +#endif import NIOHTTP1 import NIOSSL @@ -747,3 +751,7 @@ class MockHTTPRequest: HTTPSchedulableRequest { preconditionFailure("Unimplemented") } } + +#if swift(>=5.6) +extension MockHTTPRequest: @unchecked Sendable {} +#endif diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift b/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift index b5b67c809..1a4fc5742 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift @@ -14,7 +14,11 @@ @testable import AsyncHTTPClient import NIOConcurrencyHelpers +#if swift(>=5.6) +@preconcurrency import NIOCore +#else import NIOCore +#endif // This is a MockRequestExecutor, that is synchronized on its EventLoop. final class MockRequestExecutor { @@ -273,3 +277,9 @@ extension MockRequestExecutor { } } } + +#if swift(>=5.6) +extension MockRequestExecutor: @unchecked Sendable {} +extension MockRequestExecutor.RequestParts: Sendable {} +extension MockRequestExecutor.BlockingQueue: @unchecked Sendable {} +#endif diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index ed50ae02d..ede47f545 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -15,6 +15,7 @@ @testable import AsyncHTTPClient import Logging import NIOCore +import NIOConcurrencyHelpers import NIOEmbedded import NIOHTTP1 import XCTest @@ -561,16 +562,24 @@ class UploadCountingDelegate: HTTPClientResponseDelegate { } } -class MockTaskQueuer: HTTPRequestScheduler { - private(set) var hitCancelCount = 0 +final class MockTaskQueuer: HTTPRequestScheduler { + private let hitCancelCounter = NIOAtomic.makeAtomic(value: 0) + + var hitCancelCount: Int { + self.hitCancelCounter.load() + } init() {} func cancelRequest(_: HTTPSchedulableRequest) { - self.hitCancelCount += 1 + self.hitCancelCounter.add(1) } } +#if swift(>=5.6) +extension MockTaskQueuer: @unchecked Sendable {} +#endif + extension RequestOptions { static func forTests(idleReadTimeout: TimeAmount? = nil) -> Self { RequestOptions( diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 7e2c62a0d..d4ad76f1b 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -15,7 +15,11 @@ @testable import AsyncHTTPClient import Logging import NIOConcurrencyHelpers +#if swift(>=5.6) +@preconcurrency import NIOCore +#else import NIOCore +#endif import NIOEmbedded import NIOHTTP1 import NIOPosix @@ -41,23 +45,23 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return XCTFail("Expected to have a request here.") } - let (transaction, responseTask) = Transaction.makeWithResultTask( + + let queuer = MockTaskQueuer() + + await XCTAssertThrowsError(try await Transaction.awaitResponseWithTransaction( request: preparedRequest, preferredEventLoop: embeddedEventLoop - ) + ) { transaction in + transaction.requestWasQueued(queuer) - let queuer = MockTaskQueuer() - transaction.requestWasQueued(queuer) + Task.detached { + try await Task.sleep(nanoseconds: 5 * 1000 * 1000) + transaction.cancel() + } - Task.detached { - try await Task.sleep(nanoseconds: 5 * 1000 * 1000) - transaction.cancel() - } + XCTAssertEqual(queuer.hitCancelCount, 0) + }) - XCTAssertEqual(queuer.hitCancelCount, 0) - await XCTAssertThrowsError(try await responseTask.value) { - XCTAssertEqual($0 as? HTTPClientError, .cancelled) - } XCTAssertEqual(queuer.hitCancelCount, 1) } #endif @@ -78,50 +82,54 @@ final class TransactionTests: XCTestCase { guard let preparedRequest = maybePreparedRequest else { return } - let (transaction, responseTask) = Transaction.makeWithResultTask( - request: preparedRequest, - preferredEventLoop: embeddedEventLoop - ) let executor = MockRequestExecutor( pauseRequestBodyPartStreamAfterASingleWrite: true, eventLoop: embeddedEventLoop ) - transaction.willExecuteRequest(executor) - transaction.requestHeadSent() + let response = try await Transaction.awaitResponseWithTransaction( + request: preparedRequest, + preferredEventLoop: embeddedEventLoop + ) { transaction in - let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) - XCTAssertFalse(executor.signalledDemandForResponseBody) - transaction.receiveResponseHead(responseHead) + transaction.willExecuteRequest(executor) + transaction.requestHeadSent() - let response = try await responseTask.value - XCTAssertEqual(response.status, responseHead.status) - XCTAssertEqual(response.headers, responseHead.headers) - XCTAssertEqual(response.version, responseHead.version) + let responseHead = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) + XCTAssertFalse(executor.signalledDemandForResponseBody) + transaction.receiveResponseHead(responseHead) - let iterator = SharedIterator(response.body.filter { $0.readableBytes > 0 }.makeAsyncIterator()) + for i in 0..<100 { + XCTAssertFalse(executor.signalledDemandForResponseBody, "Demand was not signalled yet.") - for i in 0..<100 { - XCTAssertFalse(executor.signalledDemandForResponseBody, "Demand was not signalled yet.") - - async let part = iterator.next() + XCTAssertNoThrow(try executor.receiveResponseDemand()) + executor.resetResponseStreamDemandSignal() + transaction.receiveResponseBodyParts([ByteBuffer(integer: i)]) + } + XCTAssertFalse(executor.signalledDemandForResponseBody, "Demand was not signalled yet.") XCTAssertNoThrow(try executor.receiveResponseDemand()) executor.resetResponseStreamDemandSignal() - transaction.receiveResponseBodyParts([ByteBuffer(integer: i)]) + transaction.succeedRequest([]) + } - let result = try await part + let expectedResponse = HTTPResponseHead(version: .http1_1, status: .ok, headers: ["foo": "bar"]) + + XCTAssertEqual(response.status, expectedResponse.status) + XCTAssertEqual(response.headers, expectedResponse.headers) + XCTAssertEqual(response.version, expectedResponse.version) + + var iterator = response.body.filter { $0.readableBytes > 0 }.makeAsyncIterator() + + for i in 0..<100 { + let result = try await iterator.next() XCTAssertEqual(result, ByteBuffer(integer: i)) } - XCTAssertFalse(executor.signalledDemandForResponseBody, "Demand was not signalled yet.") - async let part = iterator.next() - XCTAssertNoThrow(try executor.receiveResponseDemand()) - executor.resetResponseStreamDemandSignal() - transaction.succeedRequest([]) - let result = try await part - XCTAssertNil(result) + let final = try await iterator.next() + XCTAssertNil(final) + } #endif } @@ -581,5 +589,34 @@ extension Transaction { return (transaction, result) } + + fileprivate static func awaitResponseWithTransaction( + request: PreparedRequest, + requestOptions: RequestOptions = .forTests(), + logger: Logger = Logger(label: "test"), + connectionDeadline: NIODeadline = .distantFuture, + preferredEventLoop: EventLoop, + _ closure: @Sendable @escaping (Transaction) async throws -> () + ) async throws -> HTTPClientResponse { + + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let transaction = Transaction( + request: request, + requestOptions: requestOptions, + logger: logger, + connectionDeadline: connectionDeadline, + preferredEventLoop: preferredEventLoop, + responseContinuation: continuation + ) + + Task { + do { + try await closure(transaction) + } catch { + XCTFail() + } + } + } + } } #endif