From 4883ff71e7ad0a98d59ceabf9de38d8e7f53a688 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 29 Apr 2025 09:06:40 +0100 Subject: [PATCH 1/2] Fix sendability issues in tests Motivation: The tests shouldn't be making sendability violations. Modifications: Fix the warnings Result: - No warnings - Strict concurrency is adopted! --- Package.swift | 36 +++++++-- .../AsyncAwaitEndToEndTests.swift | 8 +- .../AsyncTestHelpers.swift | 2 +- .../HTTP1ClientChannelHandlerTests.swift | 70 +++++++++-------- .../HTTP2ClientRequestHandlerTests.swift | 7 +- .../HTTP2ClientTests.swift | 20 +++-- .../HTTPClientInternalTests.swift | 50 ++++++++---- .../HTTPClientRequestTests.swift | 45 +++++++++-- .../HTTPClientTests.swift | 73 +++++++++--------- .../HTTPConnectionPool+FactoryTests.swift | 2 +- ...HTTPConnectionPool+RequestQueueTests.swift | 2 +- .../Mocks/MockHTTPExecutableRequest.swift | 47 +++++++----- .../Mocks/MockRequestExecutor.swift | 49 ++++++++---- .../RequestBagTests.swift | 76 ++++++++++++------- .../SOCKSEventsHandlerTests.swift | 2 +- .../AsyncHTTPClientTests/SOCKSTestUtils.swift | 4 +- .../TLSEventsHandlerTests.swift | 2 +- .../TransactionTests.swift | 43 ++++++----- 18 files changed, 347 insertions(+), 191 deletions(-) diff --git a/Package.swift b/Package.swift index c6b391815..2a58a41a9 100644 --- a/Package.swift +++ b/Package.swift @@ -15,18 +15,36 @@ import PackageDescription +let strictConcurrencyDevelopment = false + +let strictConcurrencySettings: [SwiftSetting] = { + var initialSettings: [SwiftSetting] = [] + initialSettings.append(contentsOf: [ + .enableUpcomingFeature("StrictConcurrency"), + .enableUpcomingFeature("InferSendableFromCaptures"), + ]) + + if strictConcurrencyDevelopment { + // -warnings-as-errors here is a workaround so that IDE-based development can + // get tripped up on -require-explicit-sendable. + initialSettings.append(.unsafeFlags(["-Xfrontend", "-require-explicit-sendable", "-warnings-as-errors"])) + } + + return initialSettings +}() + let package = Package( name: "async-http-client", products: [ .library(name: "AsyncHTTPClient", targets: ["AsyncHTTPClient"]) ], dependencies: [ - .package(url: "https://github.com/apple/swift-nio.git", from: "2.78.0"), - .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.27.1"), - .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.19.0"), - .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.13.0"), - .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.19.0"), - .package(url: "https://github.com/apple/swift-log.git", from: "1.4.4"), + .package(url: "https://github.com/apple/swift-nio.git", from: "2.81.0"), + .package(url: "https://github.com/apple/swift-nio-ssl.git", from: "2.30.0"), + .package(url: "https://github.com/apple/swift-nio-http2.git", from: "1.36.0"), + .package(url: "https://github.com/apple/swift-nio-extras.git", from: "1.26.0"), + .package(url: "https://github.com/apple/swift-nio-transport-services.git", from: "1.24.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.6.0"), .package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"), .package(url: "https://github.com/apple/swift-algorithms.git", from: "1.0.0"), ], @@ -55,7 +73,8 @@ let package = Package( .product(name: "Logging", package: "swift-log"), .product(name: "Atomics", package: "swift-atomics"), .product(name: "Algorithms", package: "swift-algorithms"), - ] + ], + swiftSettings: strictConcurrencySettings ), .testTarget( name: "AsyncHTTPClientTests", @@ -79,7 +98,8 @@ let package = Package( .copy("Resources/self_signed_key.pem"), .copy("Resources/example.com.cert.pem"), .copy("Resources/example.com.private-key.pem"), - ] + ], + swiftSettings: strictConcurrencySettings ), ] ) diff --git a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift index 2084e18ba..56a08b852 100644 --- a/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift +++ b/Tests/AsyncHTTPClientTests/AsyncAwaitEndToEndTests.swift @@ -526,6 +526,8 @@ final class AsyncAwaitEndToEndTests: XCTestCase { } func testConnectTimeout() { + let serverGroup = self.serverGroup! + let clientGroup = self.clientGroup! XCTAsyncTest(timeout: 60) { #if os(Linux) // 198.51.100.254 is reserved for documentation only and therefore should not accept any TCP connection @@ -542,7 +544,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { XCTAssertNoThrow(try group.syncShutdownGracefully()) } - let serverChannel = try await ServerBootstrap(group: self.serverGroup) + let serverChannel = try await ServerBootstrap(group: serverGroup) .serverChannelOption(ChannelOptions.backlog, value: 1) .serverChannelOption(ChannelOptions.autoRead, value: false) .bind(host: "127.0.0.1", port: 0) @@ -551,7 +553,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { XCTAssertNoThrow(try serverChannel.close().wait()) } let port = serverChannel.localAddress!.port! - let firstClientChannel = try await ClientBootstrap(group: self.serverGroup) + let firstClientChannel = try await ClientBootstrap(group: serverGroup) .connect(host: "127.0.0.1", port: port) .get() defer { @@ -561,7 +563,7 @@ final class AsyncAwaitEndToEndTests: XCTestCase { #endif let httpClient = HTTPClient( - eventLoopGroupProvider: .shared(self.clientGroup), + eventLoopGroupProvider: .shared(clientGroup), configuration: .init(timeout: .init(connect: .milliseconds(100), read: .milliseconds(150))) ) diff --git a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift index cbab922a4..4a5c8d486 100644 --- a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift +++ b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift @@ -17,7 +17,7 @@ import NIOCore /// ``AsyncSequenceWriter`` is `Sendable` because its state is protected by a Lock @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -final class AsyncSequenceWriter: AsyncSequence, @unchecked Sendable { +final class AsyncSequenceWriter: AsyncSequence, @unchecked Sendable { typealias AsyncIterator = Iterator struct Iterator: AsyncIteratorProtocol { diff --git a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift index df1a2926a..0d871b7dc 100644 --- a/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP1ClientChannelHandlerTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Logging +import NIOConcurrencyHelpers import NIOCore import NIOEmbedded import NIOHTTP1 @@ -833,10 +834,11 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { ) try channel.connect(to: .init(ipAddress: "127.0.0.1", port: 80)).wait() - let request = MockHTTPExecutableRequest() // non empty body is important to trigger this bug as we otherwise finish the request in a single flush - request.requestFramingMetadata.body = .fixedSize(1) - request.raiseErrorIfUnimplementedMethodIsCalled = false + let request = MockHTTPExecutableRequest( + framingMetadata: RequestFramingMetadata(connectionClose: false, body: .fixedSize(1)), + raiseErrorIfUnimplementedMethodIsCalled: false + ) channel.writeAndFlush(request, promise: nil) XCTAssertEqual(request.events.map(\.kind), [.willExecuteRequest, .requestHeadSent]) } @@ -897,34 +899,43 @@ class HTTP1ClientChannelHandlerTests: XCTestCase { } } -class TestBackpressureWriter { +final class TestBackpressureWriter: Sendable { let eventLoop: EventLoop let parts: Int var finishFuture: EventLoopFuture { self.finishPromise.futureResult } private let finishPromise: EventLoopPromise - private(set) var written: Int = 0 - private var channelIsWritable: Bool = false + private struct State { + var written = 0 + var channelIsWritable = false + } + + var written: Int { + self.state.value.written + } + + private let state: NIOLoopBoundBox init(eventLoop: EventLoop, parts: Int) { self.eventLoop = eventLoop self.parts = parts - + self.state = .makeBoxSendingValue(State(), eventLoop: eventLoop) self.finishPromise = eventLoop.makePromise(of: Void.self) } func start(writer: HTTPClient.Body.StreamWriter, expectedErrors: [HTTPClientError] = []) -> EventLoopFuture { + @Sendable func recursive() { XCTAssert(self.eventLoop.inEventLoop) - XCTAssert(self.channelIsWritable) - if self.written == self.parts { + XCTAssert(self.state.value.channelIsWritable) + if self.state.value.written == self.parts { self.finishPromise.succeed(()) } else { self.eventLoop.execute { let future = writer.write(.byteBuffer(.init(bytes: [0, 1]))) - self.written += 1 + self.state.value.written += 1 future.whenComplete { result in switch result { case .success: @@ -951,14 +962,14 @@ class TestBackpressureWriter { } func writabilityChanged(_ newValue: Bool) { - self.channelIsWritable = newValue + self.state.value.channelIsWritable = newValue } } -class ResponseBackpressureDelegate: HTTPClientResponseDelegate { +final class ResponseBackpressureDelegate: HTTPClientResponseDelegate { typealias Response = Void - enum State { + enum State: Sendable { case consuming(EventLoopPromise) case waitingForRemote(CircularBuffer>) case buffering((ByteBuffer?, EventLoopPromise)?) @@ -966,21 +977,20 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { } let eventLoop: EventLoop - private var state: State = .buffering(nil) + private let state: NIOLoopBoundBox init(eventLoop: EventLoop) { self.eventLoop = eventLoop - - self.state = .consuming(self.eventLoop.makePromise(of: Void.self)) + self.state = .makeBoxSendingValue(.consuming(eventLoop.makePromise(of: Void.self)), eventLoop: eventLoop) } func next() -> EventLoopFuture { - switch self.state { + switch self.state.value { case .consuming(let backpressurePromise): var promiseBuffer = CircularBuffer>() let newPromise = self.eventLoop.makePromise(of: ByteBuffer?.self) promiseBuffer.append(newPromise) - self.state = .waitingForRemote(promiseBuffer) + self.state.value = .waitingForRemote(promiseBuffer) backpressurePromise.succeed(()) return newPromise.futureResult @@ -991,18 +1001,18 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { ) let promise = self.eventLoop.makePromise(of: ByteBuffer?.self) promiseBuffer.append(promise) - self.state = .waitingForRemote(promiseBuffer) + self.state.value = .waitingForRemote(promiseBuffer) return promise.futureResult case .buffering(.none): var promiseBuffer = CircularBuffer>() let promise = self.eventLoop.makePromise(of: ByteBuffer?.self) promiseBuffer.append(promise) - self.state = .waitingForRemote(promiseBuffer) + self.state.value = .waitingForRemote(promiseBuffer) return promise.futureResult case .buffering(.some((let buffer, let promise))): - self.state = .buffering(nil) + self.state.value = .buffering(nil) promise.succeed(()) return self.eventLoop.makeSucceededFuture(buffer) @@ -1012,7 +1022,7 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { } func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - switch self.state { + switch self.state.value { case .consuming(let backpressurePromise): return backpressurePromise.futureResult @@ -1025,7 +1035,7 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { } func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { - switch self.state { + switch self.state.value { case .waitingForRemote(var promiseBuffer): assert( !promiseBuffer.isEmpty, @@ -1034,18 +1044,18 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { let promise = promiseBuffer.removeFirst() if promiseBuffer.isEmpty { let newBackpressurePromise = self.eventLoop.makePromise(of: Void.self) - self.state = .consuming(newBackpressurePromise) + self.state.value = .consuming(newBackpressurePromise) promise.succeed(buffer) return newBackpressurePromise.futureResult } else { - self.state = .waitingForRemote(promiseBuffer) + self.state.value = .waitingForRemote(promiseBuffer) promise.succeed(buffer) return self.eventLoop.makeSucceededVoidFuture() } case .buffering(.none): let promise = self.eventLoop.makePromise(of: Void.self) - self.state = .buffering((buffer, promise)) + self.state.value = .buffering((buffer, promise)) return promise.futureResult case .buffering(.some): @@ -1059,15 +1069,15 @@ class ResponseBackpressureDelegate: HTTPClientResponseDelegate { } func didFinishRequest(task: HTTPClient.Task) throws { - switch self.state { + switch self.state.value { case .waitingForRemote(let promiseBuffer): for promise in promiseBuffer { promise.succeed(.none) } - self.state = .done + self.state.value = .done case .buffering(.none): - self.state = .done + self.state.value = .done case .done, .consuming: preconditionFailure("Invalid state: \(self.state)") @@ -1093,7 +1103,7 @@ class ReadEventHitHandler: ChannelOutboundHandler { } } -final class FailEndHandler: ChannelOutboundHandler { +final class FailEndHandler: ChannelOutboundHandler, Sendable { typealias OutboundIn = HTTPClientRequestPart typealias OutboundOut = HTTPClientRequestPart diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift index 1f5f1b4c0..71f7f3d1a 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientRequestHandlerTests.swift @@ -568,10 +568,11 @@ class HTTP2ClientRequestHandlerTests: XCTestCase { ) try channel.connect(to: .init(ipAddress: "127.0.0.1", port: 80)).wait() - let request = MockHTTPExecutableRequest() // non empty body is important to trigger this bug as we otherwise finish the request in a single flush - request.requestFramingMetadata.body = .fixedSize(1) - request.raiseErrorIfUnimplementedMethodIsCalled = false + let request = MockHTTPExecutableRequest( + framingMetadata: RequestFramingMetadata(connectionClose: false, body: .fixedSize(1)), + raiseErrorIfUnimplementedMethodIsCalled: false + ) channel.writeAndFlush(request, promise: nil) XCTAssertEqual(request.events.map(\.kind), [.willExecuteRequest, .requestHeadSent]) } diff --git a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift index d6bc2de14..183a227bd 100644 --- a/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTP2ClientTests.swift @@ -14,6 +14,7 @@ import AsyncHTTPClient // NOT @testable - tests that really need @testable go into HTTP2ClientInternalTests.swift import Logging +import NIOConcurrencyHelpers import NIOCore import NIOFoundationCompat import NIOHTTP1 @@ -283,15 +284,16 @@ class HTTP2ClientTests: XCTestCase { XCTAssertNoThrow(maybeRequest = try HTTPClient.Request(url: "https://localhost:\(bin.port)")) guard let request = maybeRequest else { return } - var task: HTTPClient.Task! + let taskBox = NIOLockedValueBox?>(nil) let delegate = HeadReceivedCallback { _ in // request is definitely running because we just received a head from the server - task.cancel() + taskBox.withLockedValue { $0 }!.cancel() } - task = client.execute( + let task = client.execute( request: request, delegate: delegate ) + taskBox.withLockedValue { $0 = task } XCTAssertThrowsError(try task.futureResult.timeout(after: .seconds(2)).wait()) { XCTAssertEqualTypeAndValue($0, HTTPClientError.cancelled) @@ -360,18 +362,20 @@ class HTTP2ClientTests: XCTestCase { guard let request = maybeRequest else { return } let tasks = (0..<100).map { _ -> HTTPClient.Task in - var task: HTTPClient.Task! + let taskBox = NIOLockedValueBox?>(nil) + let delegate = HeadReceivedCallback { _ in // request is definitely running because we just received a head from the server cancelPool.next().execute { // canceling from a different thread - task.cancel() + taskBox.withLockedValue { $0 }!.cancel() } } - task = client.execute( + let task = client.execute( request: request, delegate: delegate ) + taskBox.withLockedValue { $0 = task } return task } @@ -547,8 +551,8 @@ class HTTP2ClientTests: XCTestCase { private final class HeadReceivedCallback: HTTPClientResponseDelegate { typealias Response = Void - private let didReceiveHeadCallback: (HTTPResponseHead) -> Void - init(didReceiveHead: @escaping (HTTPResponseHead) -> Void) { + private let didReceiveHeadCallback: @Sendable (HTTPResponseHead) -> Void + init(didReceiveHead: @escaping @Sendable (HTTPResponseHead) -> Void) { self.didReceiveHeadCallback = didReceiveHead } diff --git a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift index 11c0af534..634efc14c 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientInternalTests.swift @@ -164,10 +164,10 @@ class HTTPClientInternalTests: XCTestCase { } func testChannelAndDelegateOnDifferentEventLoops() throws { - class Delegate: HTTPClientResponseDelegate { + final class Delegate: HTTPClientResponseDelegate { typealias Response = ([Message], [Message]) - enum Message { + enum Message: Sendable { case head(HTTPResponseHead) case bodyPart(ByteBuffer) case sentRequestHead(HTTPRequestHead) @@ -176,33 +176,51 @@ class HTTPClientInternalTests: XCTestCase { case error(Error) } - var receivedMessages: [Message] = [] - var sentMessages: [Message] = [] + private struct Messages: Sendable { + var received: [Message] = [] + var sent: [Message] = [] + } + + private let messages: NIOLoopBoundBox + + var receivedMessages: [Message] { + get { + self.messages.value.received + } + set { + self.messages.value.received = newValue + } + } + var sentMessages: [Message] { + get { + self.messages.value.sent + } + set { + self.messages.value.sent = newValue + } + } private let eventLoop: EventLoop private let randoEL: EventLoop init(expectedEventLoop: EventLoop, randomOtherEventLoop: EventLoop) { self.eventLoop = expectedEventLoop self.randoEL = randomOtherEventLoop + self.messages = .makeBoxSendingValue(Messages(), eventLoop: expectedEventLoop) } func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) { - self.eventLoop.assertInEventLoop() self.sentMessages.append(.sentRequestHead(head)) } func didSendRequestPart(task: HTTPClient.Task, _ part: IOData) { - self.eventLoop.assertInEventLoop() self.sentMessages.append(.sentRequestPart(part)) } func didSendRequest(task: HTTPClient.Task) { - self.eventLoop.assertInEventLoop() self.sentMessages.append(.sentRequest) } func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.eventLoop.assertInEventLoop() self.receivedMessages.append(.error(error)) } @@ -210,7 +228,6 @@ class HTTPClientInternalTests: XCTestCase { task: HTTPClient.Task, _ head: HTTPResponseHead ) -> EventLoopFuture { - self.eventLoop.assertInEventLoop() self.receivedMessages.append(.head(head)) return self.randoEL.makeSucceededFuture(()) } @@ -219,14 +236,12 @@ class HTTPClientInternalTests: XCTestCase { task: HTTPClient.Task, _ buffer: ByteBuffer ) -> EventLoopFuture { - self.eventLoop.assertInEventLoop() self.receivedMessages.append(.bodyPart(buffer)) return self.randoEL.makeSucceededFuture(()) } func didFinishRequest(task: HTTPClient.Task) throws -> Response { - self.eventLoop.assertInEventLoop() - return (self.receivedMessages, self.sentMessages) + (self.receivedMessages, self.sentMessages) } } @@ -460,11 +475,15 @@ class HTTPClientInternalTests: XCTestCase { } func testConnectErrorCalloutOnCorrectEL() throws { - class TestDelegate: HTTPClientResponseDelegate { + final class TestDelegate: HTTPClientResponseDelegate { typealias Response = Void let expectedEL: EventLoop - var receivedError: Bool = false + let _receivedError = NIOLockedValueBox(false) + + var receivedError: Bool { + self._receivedError.withLockedValue { $0 } + } init(expectedEL: EventLoop) { self.expectedEL = expectedEL @@ -473,7 +492,7 @@ class HTTPClientInternalTests: XCTestCase { func didFinishRequest(task: HTTPClient.Task) throws {} func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.receivedError = true + self._receivedError.withLockedValue { $0 = true } XCTAssertTrue(self.expectedEL.inEventLoop) } } @@ -658,6 +677,7 @@ class HTTPClientInternalTests: XCTestCase { ).futureResult } _ = try EventLoopFuture.whenAllSucceed(resultFutures, on: self.clientGroup.next()).wait() + let threadPools = delegates.map { $0._fileIOThreadPool } let firstThreadPool = threadPools.first ?? nil XCTAssert(threadPools.dropFirst().allSatisfy { $0 === firstThreadPool }) diff --git a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift index c8c2e3b87..54467aab7 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientRequestTests.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Algorithms +import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import XCTest @@ -493,7 +494,7 @@ class HTTPClientRequestTests: XCTestCase { request.method = .POST let asyncSequence = ByteBuffer(string: "post body") .readableBytesView - .chunks(ofCount: 2) + .uncheckedSendableChunks(ofCount: 2) .async .map { ByteBuffer($0) } @@ -541,7 +542,7 @@ class HTTPClientRequestTests: XCTestCase { request.method = .POST let asyncSequence = ByteBuffer(string: "post body") .readableBytesView - .chunks(ofCount: 2) + .uncheckedSendableChunks(ofCount: 2) .async .map { ByteBuffer($0) } @@ -619,7 +620,7 @@ class HTTPClientRequestTests: XCTestCase { func testChunkingSequenceThatDoesNotImplementWithContiguousStorageIfAvailable() async throws { let bagOfBytesToByteBufferConversionChunkSize = 8 let body = try await HTTPClientRequest.Body._bytes( - AnySequence( + AnySendableSequence( Array(repeating: 0, count: bagOfBytesToByteBufferConversionChunkSize) + Array(repeating: 1, count: bagOfBytesToByteBufferConversionChunkSize) ), @@ -729,17 +730,17 @@ extension HTTPClient.Body { func collect() -> EventLoopFuture<[ByteBuffer]> { let eelg = EmbeddedEventLoopGroup(loops: 1) let el = eelg.next() - var body = [ByteBuffer]() + let body = NIOLockedValueBox<[ByteBuffer]>([]) let writer = StreamWriter { switch $0 { case .byteBuffer(let byteBuffer): - body.append(byteBuffer) + body.withLockedValue { $0.append(byteBuffer) } case .fileRegion: fatalError("file region not supported") } return el.makeSucceededVoidFuture() } - return self.stream(writer).map { _ in body } + return self.stream(writer).map { _ in body.withLockedValue { $0 } } } } @@ -784,3 +785,35 @@ extension Optional where Wrapped == HTTPClientRequest.Prepared.Body { } } } + +// swift-algorithms hasn't adopted Sendable yet. By inspection ChunksOfCountCollection should be +// Sendable assuming the underlying collection is. This wrapper allows us to avoid a blanket +// preconcurrency import of the Algorithms module. +struct UncheckedSendableChunksOfCountCollection: Collection, @unchecked Sendable +where Base: Sendable { + typealias Element = Base.SubSequence + typealias Index = ChunksOfCountCollection.Index + + private let underlying: ChunksOfCountCollection + + init(_ underlying: ChunksOfCountCollection) { + self.underlying = underlying + } + + var startIndex: Index { self.underlying.startIndex } + var endIndex: Index { self.underlying.endIndex } + + subscript(position: Index) -> Base.SubSequence { + self.underlying[position] + } + + func index(after i: Index) -> Index { + self.underlying.index(after: i) + } +} + +extension Collection where Self: Sendable { + func uncheckedSendableChunks(ofCount count: Int) -> UncheckedSendableChunksOfCountCollection { + UncheckedSendableChunksOfCountCollection(self.chunks(ofCount: count)) + } +} diff --git a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift index 307e56fd3..50c3ecb9d 100644 --- a/Tests/AsyncHTTPClientTests/HTTPClientTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPClientTests.swift @@ -314,10 +314,9 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testPostWithGenericBody() throws { - let bodyData = Array("hello, world!").lazy.map { $0.uppercased().first!.asciiValue! } - let erasedData = AnyRandomAccessCollection(bodyData) + let bodyData = Array(Array("hello, world!").lazy.map { $0.uppercased().first!.asciiValue! }) - let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: .bytes(erasedData)) + let response = try self.defaultClient.post(url: self.defaultHTTPBinURLPrefix + "post", body: .bytes(bodyData)) .wait() let bytes = response.body.flatMap { $0.getData(at: 0, length: $0.readableBytes) } let data = try JSONDecoder().decode(RequestInfo.self, from: bytes!) @@ -907,8 +906,8 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { body: .stream { streamWriter in _ = streamWriter.write(.byteBuffer(.init())) - let promise = self.clientGroup.next().makePromise(of: Void.self) - self.clientGroup.next().scheduleTask(in: .milliseconds(3)) { + let promise = localClient.eventLoopGroup.next().makePromise(of: Void.self) + localClient.eventLoopGroup.next().scheduleTask(in: .milliseconds(3)) { streamWriter.write(.byteBuffer(.init())).cascade(to: promise) } @@ -1124,23 +1123,23 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { XCTAssertNoThrow(try localClient.syncShutdown()) } - class EventLoopValidatingDelegate: HTTPClientResponseDelegate { + final class EventLoopValidatingDelegate: HTTPClientResponseDelegate { typealias Response = Bool let eventLoop: EventLoop - var result = false + let result = NIOLockedValueBox(false) init(eventLoop: EventLoop) { self.eventLoop = eventLoop } func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - self.result = task.eventLoop === self.eventLoop + self.result.withLockedValue { $0 = task.eventLoop === self.eventLoop } return task.eventLoop.makeSucceededFuture(()) } func didFinishRequest(task: HTTPClient.Task) throws -> Bool { - self.result + self.result.withLockedValue { $0 } } } @@ -1348,7 +1347,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let numberOfRequestsPerThread = 1000 let numberOfParallelWorkers = 5 - final class HTTPServer: ChannelInboundHandler { + final class HTTPServer: ChannelInboundHandler, Sendable { typealias InboundIn = HTTPServerRequestPart typealias OutboundOut = HTTPServerResponsePart @@ -1394,10 +1393,11 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let url = "http://127.0.0.1:\(server?.localAddress?.port ?? -1)/hello" let g = DispatchGroup() + let defaultClient = self.defaultClient! for workerID in 0.. Channel? { try? ServerBootstrap(group: group) .childChannelInitializer { channel in - channel.pipeline.configureHTTPServerPipeline().flatMap { - channel.pipeline.addHandler( + channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { + try channel.pipeline.syncOperations.addHandler( HTTPServer( headPromise: headPromise, bodyPromises: bodyPromises, @@ -2574,11 +2576,12 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testUploadStreamingCallinToleratedFromOtsideEL() throws { + let defaultClient = self.defaultClient! let request = try HTTPClient.Request( url: self.defaultHTTPBinURLPrefix + "get", method: .POST, body: .stream(contentLength: 4) { writer in - let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) + let promise = defaultClient.eventLoopGroup.next().makePromise(of: Void.self) // We have to toleare callins from any thread DispatchQueue(label: "upload-streaming").async { writer.write(.byteBuffer(ByteBuffer(string: "1234"))).whenComplete { _ in @@ -3282,12 +3285,12 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testConnectErrorPropagatedToDelegate() throws { - class TestDelegate: HTTPClientResponseDelegate { + final class TestDelegate: HTTPClientResponseDelegate { typealias Response = Void - var error: Error? + let error = NIOLockedValueBox(nil) func didFinishRequest(task: HTTPClient.Task) throws {} func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.error = error + self.error.withLockedValue { $0 = error } } } @@ -3306,12 +3309,12 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { XCTAssertThrowsError(try httpClient.execute(request: request, delegate: delegate).wait()) { XCTAssertEqualTypeAndValue($0, HTTPClientError.connectTimeout) - XCTAssertEqualTypeAndValue(delegate.error, HTTPClientError.connectTimeout) + XCTAssertEqualTypeAndValue(delegate.error.withLockedValue { $0 }, HTTPClientError.connectTimeout) } } func testDelegateCallinsTolerateRandomEL() throws { - class TestDelegate: HTTPClientResponseDelegate { + final class TestDelegate: HTTPClientResponseDelegate { typealias Response = Void let eventLoop: EventLoop @@ -3393,13 +3396,14 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { func testContentLengthTooLongFails() throws { let url = self.defaultHTTPBinURLPrefix + "post" + let defaultClient = self.defaultClient! XCTAssertThrowsError( try self.defaultClient.execute( request: Request( url: url, body: .stream(contentLength: 10) { streamWriter in - let promise = self.defaultClient.eventLoopGroup.next().makePromise(of: Void.self) + let promise = defaultClient.eventLoopGroup.next().makePromise(of: Void.self) DispatchQueue(label: "content-length-test").async { streamWriter.write(.byteBuffer(ByteBuffer(string: "1"))).cascade(to: promise) } @@ -3495,6 +3499,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { // second connection. _ = self.defaultClient.get(url: "http://localhost:\(self.defaultHTTPBin.port)/events/10/1") + let clientGroup = self.clientGroup! var request = try HTTPClient.Request(url: "http://localhost:\(self.defaultHTTPBin.port)/wait", method: .POST) request.body = .stream { writer in // Start writing chunks so tha we will try to write after read timeout is thrown @@ -3502,8 +3507,8 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { _ = writer.write(.byteBuffer(ByteBuffer(string: "1234"))) } - let promise = self.clientGroup.next().makePromise(of: Void.self) - self.clientGroup.next().scheduleTask(in: .milliseconds(3)) { + let promise = clientGroup.next().makePromise(of: Void.self) + clientGroup.next().scheduleTask(in: .milliseconds(3)) { writer.write(.byteBuffer(ByteBuffer(string: "1234"))).cascade(to: promise) } @@ -3518,7 +3523,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { } func testSSLHandshakeErrorPropagation() throws { - class CloseHandler: ChannelInboundHandler { + final class CloseHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Any func channelRead(context: ChannelHandlerContext, data: NIOAny) { @@ -3575,11 +3580,11 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { func testSSLHandshakeErrorPropagationDelayedClose() throws { // This is as the test above, but the close handler delays its close action by a few hundred ms. // This will tend to catch the pipeline at different weird stages, and flush out different bugs. - class CloseHandler: ChannelInboundHandler { + final class CloseHandler: ChannelInboundHandler, Sendable { typealias InboundIn = Any func channelRead(context: ChannelHandlerContext, data: NIOAny) { - context.eventLoop.scheduleTask(in: .milliseconds(100)) { + context.eventLoop.assumeIsolated().scheduleTask(in: .milliseconds(100)) { context.close(promise: nil) } } @@ -3636,8 +3641,8 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { let server = try ServerBootstrap(group: self.serverGroup) .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) .childChannelInitializer { channel in - channel.pipeline.configureHTTPServerPipeline().flatMap { - channel.pipeline.addHandler(CloseWithoutClosingServerHandler(group.leave)) + channel.pipeline.configureHTTPServerPipeline().flatMapThrowing { + try channel.pipeline.syncOperations.addHandler(CloseWithoutClosingServerHandler(group.leave)) } } .bind(host: "localhost", port: 0) @@ -4230,7 +4235,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { body: nil ) - class CancelAfterRedirect: HTTPClientResponseDelegate { + final class CancelAfterRedirect: HTTPClientResponseDelegate, Sendable { init() {} func didFinishRequest(task: AsyncHTTPClient.HTTPClient.Task) throws {} } @@ -4261,7 +4266,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { body: nil ) - class FailAfterRedirect: HTTPClientResponseDelegate { + final class FailAfterRedirect: HTTPClientResponseDelegate, Sendable { init() {} func didFinishRequest(task: AsyncHTTPClient.HTTPClient.Task) throws {} } @@ -4291,7 +4296,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { // non-empty body is important request.body = .byteBuffer(ByteBuffer([1])) - class CancelAfterHeadSend: HTTPClientResponseDelegate { + final class CancelAfterHeadSend: HTTPClientResponseDelegate, Sendable { init() {} func didFinishRequest(task: AsyncHTTPClient.HTTPClient.Task) throws {} func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) { @@ -4308,7 +4313,7 @@ final class HTTPClientTests: XCTestCaseHTTPClientTestsBaseClass { // non-empty body is important request.body = .byteBuffer(ByteBuffer([1])) - class CancelAfterHeadSend: HTTPClientResponseDelegate { + final class CancelAfterHeadSend: HTTPClientResponseDelegate, Sendable { init() {} func didFinishRequest(task: AsyncHTTPClient.HTTPClient.Task) throws {} func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift index a87299da1..15cc9e7e9 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+FactoryTests.swift @@ -184,7 +184,7 @@ class HTTPConnectionPool_FactoryTests: XCTestCase { } } -class NeverrespondServerHandler: ChannelInboundHandler { +final class NeverrespondServerHandler: ChannelInboundHandler, Sendable { typealias InboundIn = NIOAny func channelRead(context: ChannelHandlerContext, data: NIOAny) { diff --git a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift index d792895d3..4f4bbd785 100644 --- a/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift +++ b/Tests/AsyncHTTPClientTests/HTTPConnectionPool+RequestQueueTests.swift @@ -83,7 +83,7 @@ class HTTPConnectionPool_RequestQueueTests: XCTestCase { } } -private class MockScheduledRequest: HTTPSchedulableRequest { +final private class MockScheduledRequest: HTTPSchedulableRequest { let requiredEventLoop: EventLoop? init(requiredEventLoop: EventLoop?) { diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift b/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift index 021c69731..67f18cbb8 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockHTTPExecutableRequest.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import Logging +import NIOConcurrencyHelpers import NIOCore import NIOHTTP1 import XCTest @@ -20,7 +21,7 @@ import XCTest @testable import AsyncHTTPClient final class MockHTTPExecutableRequest: HTTPExecutableRequest { - enum Event { + enum Event: Sendable { /// ``Event`` without associated values enum Kind: Hashable { case willExecuteRequest @@ -56,39 +57,49 @@ final class MockHTTPExecutableRequest: HTTPExecutableRequest { } } - var logger: Logging.Logger = Logger(label: "request") - var requestHead: NIOHTTP1.HTTPRequestHead - var requestFramingMetadata: RequestFramingMetadata - var requestOptions: RequestOptions = .forTests() + let logger: Logging.Logger = Logger(label: "request") + let requestHead: NIOHTTP1.HTTPRequestHead + let requestFramingMetadata: RequestFramingMetadata + let requestOptions: RequestOptions = .forTests() /// if true and ``HTTPExecutableRequest`` method is called without setting a corresponding callback on `self` e.g. /// If ``HTTPExecutableRequest\.willExecuteRequest(_:)`` is called but ``willExecuteRequestCallback`` is not set, /// ``XCTestFail(_:)`` will be called to fail the current test. - var raiseErrorIfUnimplementedMethodIsCalled: Bool = true - private var file: StaticString - private var line: UInt - - var willExecuteRequestCallback: ((HTTPRequestExecutor) -> Void)? - var requestHeadSentCallback: (() -> Void)? - var resumeRequestBodyStreamCallback: (() -> Void)? - var pauseRequestBodyStreamCallback: (() -> Void)? - var receiveResponseHeadCallback: ((HTTPResponseHead) -> Void)? - var receiveResponseBodyPartsCallback: ((CircularBuffer) -> Void)? - var succeedRequestCallback: ((CircularBuffer?) -> Void)? - var failCallback: ((Error) -> Void)? + let raiseErrorIfUnimplementedMethodIsCalled: Bool + private let file: StaticString + private let line: UInt + + let willExecuteRequestCallback: (@Sendable (HTTPRequestExecutor) -> Void)? = nil + let requestHeadSentCallback: (@Sendable () -> Void)? = nil + let resumeRequestBodyStreamCallback: (@Sendable () -> Void)? = nil + let pauseRequestBodyStreamCallback: (@Sendable () -> Void)? = nil + let receiveResponseHeadCallback: (@Sendable (HTTPResponseHead) -> Void)? = nil + let receiveResponseBodyPartsCallback: (@Sendable (CircularBuffer) -> Void)? = nil + let succeedRequestCallback: (@Sendable (CircularBuffer?) -> Void)? = nil + let failCallback: (@Sendable (Error) -> Void)? = nil /// captures all ``HTTPExecutableRequest`` method calls in the order of occurrence, including arguments. /// If you are not interested in the arguments you can use `events.map(\.kind)` to get all events without arguments. - private(set) var events: [Event] = [] + private let _events = NIOLockedValueBox<[Event]>([]) + private(set) var events: [Event] { + get { + self._events.withLockedValue { $0 } + } + set { + self._events.withLockedValue { $0 = newValue } + } + } init( head: NIOHTTP1.HTTPRequestHead = .init(version: .http1_1, method: .GET, uri: "http://localhost/"), framingMetadata: RequestFramingMetadata = .init(connectionClose: false, body: .fixedSize(0)), + raiseErrorIfUnimplementedMethodIsCalled: Bool = true, file: StaticString = #file, line: UInt = #line ) { self.requestHead = head self.requestFramingMetadata = framingMetadata + self.raiseErrorIfUnimplementedMethodIsCalled = raiseErrorIfUnimplementedMethodIsCalled self.file = file self.line = line } diff --git a/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift b/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift index f85c75ce5..e5d9caa8e 100644 --- a/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift +++ b/Tests/AsyncHTTPClientTests/Mocks/MockRequestExecutor.swift @@ -25,7 +25,7 @@ final class MockRequestExecutor { case unexpectedByteBuffer } - enum RequestParts: Equatable { + enum RequestParts: Equatable, Sendable { case body(IOData) case endOfStream @@ -58,10 +58,15 @@ final class MockRequestExecutor { private let responseBodyDemandLock = ConditionLock(value: false) private let cancellationLock = ConditionLock(value: false) - private var request: HTTPExecutableRequest? - private var _signaledDemandForRequestBody: Bool = false + private struct State: Sendable { + var request: HTTPExecutableRequest? + var _signaledDemandForRequestBody: Bool = false + } + + private let state: NIOLockedValueBox init(pauseRequestBodyPartStreamAfterASingleWrite: Bool = false, eventLoop: EventLoop) { + self.state = NIOLockedValueBox(State()) self.pauseRequestBodyPartStreamAfterASingleWrite = pauseRequestBodyPartStreamAfterASingleWrite self.eventLoop = eventLoop } @@ -77,8 +82,10 @@ final class MockRequestExecutor { } private func runRequest0(_ request: HTTPExecutableRequest) { - precondition(self.request == nil) - self.request = request + self.state.withLockedValue { + precondition($0.request == nil) + $0.request = request + } request.willExecuteRequest(self) request.requestHeadSent() } @@ -127,10 +134,16 @@ final class MockRequestExecutor { } private func pauseRequestBodyStream0() { - if self._signaledDemandForRequestBody == true { - self._signaledDemandForRequestBody = false - self.request!.pauseRequestBodyStream() + let request = self.state.withLockedValue { + if $0._signaledDemandForRequestBody == true { + $0._signaledDemandForRequestBody = false + return $0.request + } else { + return nil + } } + + request?.pauseRequestBodyStream() } func resumeRequestBodyStream() { @@ -144,10 +157,16 @@ final class MockRequestExecutor { } private func resumeRequestBodyStream0() { - if self._signaledDemandForRequestBody == false { - self._signaledDemandForRequestBody = true - self.request!.resumeRequestBodyStream() + let request = self.state.withLockedValue { + if $0._signaledDemandForRequestBody == false { + $0._signaledDemandForRequestBody = true + return $0.request + } else { + return nil + } } + + request?.resumeRequestBodyStream() } func resetResponseStreamDemandSignal() { @@ -204,11 +223,13 @@ extension MockRequestExecutor: HTTPRequestExecutor { case none } - let stateChange = { () -> WriteAction in + let stateChange = { @Sendable () -> WriteAction in var pause = false if self.blockingQueue.isEmpty && self.pauseRequestBodyPartStreamAfterASingleWrite && part.isBody { pause = true - self._signaledDemandForRequestBody = false + self.state.withLockedValue { + $0._signaledDemandForRequestBody = false + } } self.blockingQueue.append(.success(part)) @@ -283,3 +304,5 @@ extension MockRequestExecutor { } } } + +extension MockRequestExecutor.BlockingQueue: @unchecked Sendable where Element: Sendable {} diff --git a/Tests/AsyncHTTPClientTests/RequestBagTests.swift b/Tests/AsyncHTTPClientTests/RequestBagTests.swift index fa92b84bd..2b0c2f6e4 100644 --- a/Tests/AsyncHTTPClientTests/RequestBagTests.swift +++ b/Tests/AsyncHTTPClientTests/RequestBagTests.swift @@ -939,7 +939,7 @@ final class RequestBagTests: XCTestCase { } func testWeDontLeakTheRequestIfTheRequestWriterWasCapturedByAPromise() { - final class LeakDetector {} + final class LeakDetector: Sendable {} let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) defer { XCTAssertNoThrow(try group.syncShutdownGracefully()) } @@ -1000,70 +1000,90 @@ extension HTTPClient.Task { } } -class UploadCountingDelegate: HTTPClientResponseDelegate { +final class UploadCountingDelegate: HTTPClientResponseDelegate { typealias Response = Void let eventLoop: EventLoop - private(set) var hitDidSendRequestHead = 0 - private(set) var hitDidSendRequestPart = 0 - private(set) var hitDidSendRequest = 0 - private(set) var hitDidReceiveResponse = 0 - private(set) var hitDidReceiveBodyPart = 0 - private(set) var hitDidReceiveError = 0 + struct State: Sendable { + var hitDidSendRequestHead = 0 + var hitDidSendRequestPart = 0 + var hitDidSendRequest = 0 + var hitDidReceiveResponse = 0 + var hitDidReceiveBodyPart = 0 + var hitDidReceiveError = 0 + + var history: [(request: HTTPClient.Request, response: HTTPResponseHead)] = [] + var receivedHead: HTTPResponseHead? + var lastBodyPart: ByteBuffer? + var backpressurePromise: EventLoopPromise? + var lastError: Error? + } + + private let state: NIOLoopBoundBox - private(set) var history: [(request: HTTPClient.Request, response: HTTPResponseHead)] = [] - private(set) var receivedHead: HTTPResponseHead? - private(set) var lastBodyPart: ByteBuffer? - private(set) var backpressurePromise: EventLoopPromise? - private(set) var lastError: Error? + var hitDidSendRequestHead: Int { self.state.value.hitDidSendRequestHead } + var hitDidSendRequestPart: Int { self.state.value.hitDidSendRequestPart } + var hitDidSendRequest: Int { self.state.value.hitDidSendRequest } + var hitDidReceiveResponse: Int { self.state.value.hitDidReceiveResponse } + var hitDidReceiveBodyPart: Int { self.state.value.hitDidReceiveBodyPart } + var hitDidReceiveError: Int { self.state.value.hitDidReceiveError } + + var history: [(request: HTTPClient.Request, response: HTTPResponseHead)] { + self.state.value.history + } + var receivedHead: HTTPResponseHead? { self.state.value.receivedHead } + var lastBodyPart: ByteBuffer? { self.state.value.lastBodyPart } + var backpressurePromise: EventLoopPromise? { self.state.value.backpressurePromise } + var lastError: Error? { self.state.value.lastError } init(eventLoop: EventLoop) { self.eventLoop = eventLoop + self.state = .makeBoxSendingValue(State(), eventLoop: eventLoop) } func didSendRequestHead(task: HTTPClient.Task, _ head: HTTPRequestHead) { - self.hitDidSendRequestHead += 1 + self.state.value.hitDidSendRequestHead += 1 } func didSendRequestPart(task: HTTPClient.Task, _ part: IOData) { - self.hitDidSendRequestPart += 1 + self.state.value.hitDidSendRequestPart += 1 } func didSendRequest(task: HTTPClient.Task) { - self.hitDidSendRequest += 1 + self.state.value.hitDidSendRequest += 1 } func didVisitURL(task: HTTPClient.Task, _ request: HTTPClient.Request, _ head: HTTPResponseHead) { - self.history.append((request, head)) + self.state.value.history.append((request, head)) } func didReceiveHead(task: HTTPClient.Task, _ head: HTTPResponseHead) -> EventLoopFuture { - self.receivedHead = head + self.state.value.receivedHead = head return self.createBackpressurePromise() } func didReceiveBodyPart(task: HTTPClient.Task, _ buffer: ByteBuffer) -> EventLoopFuture { - assert(self.backpressurePromise == nil) - self.hitDidReceiveBodyPart += 1 - self.lastBodyPart = buffer + assert(self.state.value.backpressurePromise == nil) + self.state.value.hitDidReceiveBodyPart += 1 + self.state.value.lastBodyPart = buffer return self.createBackpressurePromise() } func didFinishRequest(task: HTTPClient.Task) throws { - self.hitDidReceiveResponse += 1 + self.state.value.hitDidReceiveResponse += 1 } func didReceiveError(task: HTTPClient.Task, _ error: Error) { - self.hitDidReceiveError += 1 - self.lastError = error + self.state.value.hitDidReceiveError += 1 + self.state.value.lastError = error } private func createBackpressurePromise() -> EventLoopFuture { - assert(self.backpressurePromise == nil) - self.backpressurePromise = self.eventLoop.makePromise(of: Void.self) - return self.backpressurePromise!.futureResult.always { _ in - self.backpressurePromise = nil + assert(self.state.value.backpressurePromise == nil) + self.state.value.backpressurePromise = self.eventLoop.makePromise(of: Void.self) + return self.state.value.backpressurePromise!.futureResult.always { _ in + self.state.value.backpressurePromise = nil } } } diff --git a/Tests/AsyncHTTPClientTests/SOCKSEventsHandlerTests.swift b/Tests/AsyncHTTPClientTests/SOCKSEventsHandlerTests.swift index 1170aa444..2352c6c1c 100644 --- a/Tests/AsyncHTTPClientTests/SOCKSEventsHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/SOCKSEventsHandlerTests.swift @@ -38,7 +38,7 @@ class SOCKSEventsHandlerTests: XCTestCase { let embedded = EmbeddedChannel(handlers: [socksEventsHandler]) XCTAssertNotNil(socksEventsHandler.socksEstablishedFuture) - XCTAssertNoThrow(try embedded.pipeline.removeHandler(socksEventsHandler).wait()) + XCTAssertNoThrow(try embedded.pipeline.syncOperations.removeHandler(socksEventsHandler).wait()) XCTAssertThrowsError(try XCTUnwrap(socksEventsHandler.socksEstablishedFuture).wait()) } diff --git a/Tests/AsyncHTTPClientTests/SOCKSTestUtils.swift b/Tests/AsyncHTTPClientTests/SOCKSTestUtils.swift index ebff55a6d..50d26b278 100644 --- a/Tests/AsyncHTTPClientTests/SOCKSTestUtils.swift +++ b/Tests/AsyncHTTPClientTests/SOCKSTestUtils.swift @@ -53,7 +53,9 @@ class MockSOCKSServer { bootstrap = ServerBootstrap(group: elg) .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) .childChannelInitializer { channel in - channel.pipeline.addHandler(TestSOCKSBadServerHandler()) + channel.eventLoop.makeCompletedFuture { + try channel.pipeline.syncOperations.addHandler(TestSOCKSBadServerHandler()) + } } } else { bootstrap = ServerBootstrap(group: elg) diff --git a/Tests/AsyncHTTPClientTests/TLSEventsHandlerTests.swift b/Tests/AsyncHTTPClientTests/TLSEventsHandlerTests.swift index 96cdf68f6..988ba6e3f 100644 --- a/Tests/AsyncHTTPClientTests/TLSEventsHandlerTests.swift +++ b/Tests/AsyncHTTPClientTests/TLSEventsHandlerTests.swift @@ -39,7 +39,7 @@ class TLSEventsHandlerTests: XCTestCase { let embedded = EmbeddedChannel(handlers: [tlsEventsHandler]) XCTAssertNotNil(tlsEventsHandler.tlsEstablishedFuture) - XCTAssertNoThrow(try embedded.pipeline.removeHandler(tlsEventsHandler).wait()) + XCTAssertNoThrow(try embedded.pipeline.syncOperations.removeHandler(tlsEventsHandler).wait()) XCTAssertThrowsError(try XCTUnwrap(tlsEventsHandler.tlsEstablishedFuture).wait()) } diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index a2fa97418..8e6464a5b 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -29,11 +29,9 @@ typealias PreparedRequest = HTTPClientRequest.Prepared @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) final class TransactionTests: XCTestCase { func testCancelAsyncRequest() { - // creating the `XCTestExpectation` off the main thread crashes on Linux with Swift 5.6 - // therefore we create it here as a workaround which works fine - let scheduledRequestCanceled = self.expectation(description: "scheduled request canceled") XCTAsyncTest { let loop = NIOAsyncTestingEventLoop() + let scheduledRequestCanceled = loop.makePromise(of: Void.self) defer { XCTAssertNoThrow(try loop.syncShutdownGracefully()) } var request = HTTPClientRequest(url: "https://localhost/") @@ -49,7 +47,7 @@ final class TransactionTests: XCTestCase { ) let queuer = MockTaskQueuer { _ in - scheduledRequestCanceled.fulfill() + scheduledRequestCanceled.succeed() } transaction.requestWasQueued(queuer) @@ -64,9 +62,7 @@ final class TransactionTests: XCTestCase { } // self.fulfillment(of:) is not available on Linux - _ = { - self.wait(for: [scheduledRequestCanceled], timeout: 1) - }() + try await scheduledRequestCanceled.futureResult.timeout(after: .seconds(1)).get() } } @@ -590,22 +586,31 @@ final class TransactionTests: XCTestCase { // tasks. Since we want to wait for things to happen in tests, we need to `async let`, which creates // implicit tasks. Therefore we need to wrap our iterator struct. @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -actor SharedIterator where Wrapped.Element: Sendable { - private var wrappedIterator: Wrapped.AsyncIterator - private var nextCallInProgress: Bool = false +final class SharedIterator: Sendable where Wrapped.Element: Sendable { + private struct State: @unchecked Sendable { + var wrappedIterator: Wrapped.AsyncIterator + var nextCallInProgress: Bool = false + } + + private let state: NIOLockedValueBox init(_ sequence: Wrapped) { - self.wrappedIterator = sequence.makeAsyncIterator() + self.state = NIOLockedValueBox(State(wrappedIterator: sequence.makeAsyncIterator())) } func next() async throws -> Wrapped.Element? { - precondition(self.nextCallInProgress == false) - self.nextCallInProgress = true - var iter = self.wrappedIterator + var iter = self.state.withLockedValue { + precondition($0.nextCallInProgress == false) + $0.nextCallInProgress = true + return $0.wrappedIterator + } + defer { - precondition(self.nextCallInProgress == true) - self.nextCallInProgress = false - self.wrappedIterator = iter + self.state.withLockedValue { + precondition($0.nextCallInProgress == true) + $0.nextCallInProgress = false + $0.wrappedIterator = iter + } } return try await iter.next() } @@ -613,7 +618,7 @@ actor SharedIterator where Wrapped.Element: Sendable { /// non fail-able promise that only supports one observer @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) -private actor Promise { +private actor Promise { private enum State { case initialised case fulfilled(Value) @@ -653,7 +658,7 @@ private actor Promise { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension Transaction { fileprivate static func makeWithResultTask( - request: PreparedRequest, + request: sending PreparedRequest, requestOptions: RequestOptions = .forTests(), logger: Logger = Logger(label: "test"), connectionDeadline: NIODeadline = .distantFuture, From 96c39894a6a6fb75c40cfe4d0e2525d2d2b51699 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Thu, 1 May 2025 10:32:37 +0100 Subject: [PATCH 2/2] workaround 5.10 --- .../TransactionTests.swift | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/Tests/AsyncHTTPClientTests/TransactionTests.swift b/Tests/AsyncHTTPClientTests/TransactionTests.swift index 8e6464a5b..3316de370 100644 --- a/Tests/AsyncHTTPClientTests/TransactionTests.swift +++ b/Tests/AsyncHTTPClientTests/TransactionTests.swift @@ -657,6 +657,7 @@ private actor Promise { @available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) extension Transaction { + #if compiler(>=6.0) fileprivate static func makeWithResultTask( request: sending PreparedRequest, requestOptions: RequestOptions = .forTests(), @@ -684,4 +685,40 @@ extension Transaction { return (await transactionPromise.value, task) } + #else + fileprivate static func makeWithResultTask( + request: PreparedRequest, + requestOptions: RequestOptions = .forTests(), + logger: Logger = Logger(label: "test"), + connectionDeadline: NIODeadline = .distantFuture, + preferredEventLoop: EventLoop + ) async -> (Transaction, _Concurrency.Task) { + // It isn't sendable ... but on 6.0 and later we use 'sending'. + struct UnsafePrepareRequest: @unchecked Sendable { + var value: PreparedRequest + } + + let transactionPromise = Promise() + let unsafe = UnsafePrepareRequest(value: request) + let task = Task { + try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + let request = unsafe.value + let transaction = Transaction( + request: request, + requestOptions: requestOptions, + logger: logger, + connectionDeadline: connectionDeadline, + preferredEventLoop: preferredEventLoop, + responseContinuation: continuation + ) + Task { + await transactionPromise.fulfil(transaction) + } + } + } + + return (await transactionPromise.value, task) + } + #endif }