Skip to content

Commit 578a674

Browse files
committed
Use CurrentEventLoop proposal
and fix other `Sendable` related warnings on the way
1 parent 62c06d4 commit 578a674

17 files changed

+62
-97
lines changed

Package.swift

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ let package = Package(
4848
.product(name: "NIOTransportServices", package: "swift-nio-transport-services"),
4949
.product(name: "Logging", package: "swift-log"),
5050
.product(name: "Atomics", package: "swift-atomics"),
51-
]
51+
],
52+
swiftSettings: [.unsafeFlags(["-Xfrontend", "-strict-concurrency=complete"])]
5253
),
5354
.testTarget(
5455
name: "AsyncHTTPClientTests",

Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift

-39
Original file line numberDiff line numberDiff line change
@@ -102,13 +102,6 @@ extension HTTPClientRequest.Body {
102102
@preconcurrency
103103
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
104104
_ bytes: Bytes
105-
) -> Self where Bytes.Element == UInt8 {
106-
Self._bytes(bytes)
107-
}
108-
109-
@inlinable
110-
static func _bytes<Bytes: RandomAccessCollection>(
111-
_ bytes: Bytes
112105
) -> Self where Bytes.Element == UInt8 {
113106
self.init(.sequence(
114107
length: .known(bytes.count),
@@ -145,14 +138,6 @@ extension HTTPClientRequest.Body {
145138
public static func bytes<Bytes: Sequence & Sendable>(
146139
_ bytes: Bytes,
147140
length: Length
148-
) -> Self where Bytes.Element == UInt8 {
149-
Self._bytes(bytes, length: length)
150-
}
151-
152-
@inlinable
153-
static func _bytes<Bytes: Sequence>(
154-
_ bytes: Bytes,
155-
length: Length
156141
) -> Self where Bytes.Element == UInt8 {
157142
self.init(.sequence(
158143
length: length.storage,
@@ -185,14 +170,6 @@ extension HTTPClientRequest.Body {
185170
public static func bytes<Bytes: Collection & Sendable>(
186171
_ bytes: Bytes,
187172
length: Length
188-
) -> Self where Bytes.Element == UInt8 {
189-
Self._bytes(bytes, length: length)
190-
}
191-
192-
@inlinable
193-
static func _bytes<Bytes: Collection>(
194-
_ bytes: Bytes,
195-
length: Length
196173
) -> Self where Bytes.Element == UInt8 {
197174
self.init(.sequence(
198175
length: length.storage,
@@ -223,14 +200,6 @@ extension HTTPClientRequest.Body {
223200
public static func stream<SequenceOfBytes: AsyncSequence & Sendable>(
224201
_ sequenceOfBytes: SequenceOfBytes,
225202
length: Length
226-
) -> Self where SequenceOfBytes.Element == ByteBuffer {
227-
Self._stream(sequenceOfBytes, length: length)
228-
}
229-
230-
@inlinable
231-
static func _stream<SequenceOfBytes: AsyncSequence>(
232-
_ sequenceOfBytes: SequenceOfBytes,
233-
length: Length
234203
) -> Self where SequenceOfBytes.Element == ByteBuffer {
235204
let body = self.init(.asyncSequence(length: length.storage) {
236205
var iterator = sequenceOfBytes.makeAsyncIterator()
@@ -259,14 +228,6 @@ extension HTTPClientRequest.Body {
259228
public static func stream<Bytes: AsyncSequence & Sendable>(
260229
_ bytes: Bytes,
261230
length: Length
262-
) -> Self where Bytes.Element == UInt8 {
263-
Self._stream(bytes, length: length)
264-
}
265-
266-
@inlinable
267-
static func _stream<Bytes: AsyncSequence>(
268-
_ bytes: Bytes,
269-
length: Length
270231
) -> Self where Bytes.Element == UInt8 {
271232
let body = self.init(.asyncSequence(length: length.storage) {
272233
var iterator = bytes.makeAsyncIterator()

Sources/AsyncHTTPClient/ConnectionPool/ChannelHandler/HTTP1ProxyConnectHandler.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
2424
// transitions to `.connectSent` or `.failed`
2525
case initialized
2626
// transitions to `.headReceived` or `.failed`
27-
case connectSent(Scheduled<Void>)
27+
case connectSent(ScheduledOnCurrentEventLoop<Void>)
2828
// transitions to `.completed` or `.failed`
29-
case headReceived(Scheduled<Void>)
29+
case headReceived(ScheduledOnCurrentEventLoop<Void>)
3030
// final error state
3131
case failed(Error)
3232
// final success state
@@ -135,7 +135,7 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
135135
return
136136
}
137137

138-
let timeout = context.eventLoop.scheduleTask(deadline: self.deadline) {
138+
let timeout = context.currentEventLoop.scheduleTask(deadline: self.deadline) {
139139
switch self.state {
140140
case .initialized:
141141
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")

Sources/AsyncHTTPClient/ConnectionPool/ChannelHandler/SOCKSEventsHandler.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
2222
// transitions to channelActive or failed
2323
case initialized
2424
// transitions to socksEstablished or failed
25-
case channelActive(Scheduled<Void>)
25+
case channelActive(ScheduledOnCurrentEventLoop<Void>)
2626
// final success state
2727
case socksEstablished
2828
// final success state
@@ -99,7 +99,7 @@ final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
9999
return
100100
}
101101

102-
let scheduled = context.eventLoop.scheduleTask(deadline: self.deadline) {
102+
let scheduled = context.currentEventLoop.scheduleTask(deadline: self.deadline) {
103103
switch self.state {
104104
case .initialized, .channelActive:
105105
// close the connection, if the handshake timed out

Sources/AsyncHTTPClient/ConnectionPool/ChannelHandler/TLSEventsHandler.swift

+6-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
2222
// transitions to channelActive or failed
2323
case initialized
2424
// transitions to tlsEstablished or failed
25-
case channelActive(Scheduled<Void>?)
25+
case channelActive(ScheduledOnCurrentEventLoop<Void>?)
2626
// final success state
2727
case tlsEstablished
2828
// final success state
@@ -102,9 +102,9 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
102102
return
103103
}
104104

105-
var scheduled: Scheduled<Void>?
105+
var scheduled: ScheduledOnCurrentEventLoop<Void>?
106106
if let deadline = deadline {
107-
scheduled = context.eventLoop.scheduleTask(deadline: deadline) {
107+
scheduled = context.currentEventLoop.scheduleTask(deadline: deadline) {
108108
switch self.state {
109109
case .initialized, .channelActive:
110110
// close the connection, if the handshake timed out
@@ -121,3 +121,6 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
121121
self.state = .channelActive(scheduled)
122122
}
123123
}
124+
125+
@available(*, unavailable)
126+
extension TLSEventsHandler: Sendable {}

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1ClientChannelHandler.swift

+9-9
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
2323

2424
private var state: HTTP1ConnectionStateMachine = .init() {
2525
didSet {
26-
self.eventLoop.assertInEventLoop()
26+
self.eventLoop.wrapped.assertInEventLoop()
2727
}
2828
}
2929

@@ -50,7 +50,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
5050
}
5151

5252
private var idleReadTimeoutStateMachine: IdleReadStateMachine?
53-
private var idleReadTimeoutTimer: Scheduled<Void>?
53+
private var idleReadTimeoutTimer: ScheduledOnCurrentEventLoop<Void>?
5454

5555
/// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions.
5656
/// We therefore give each timer an ID and increase the ID every time we reset or cancel it.
@@ -59,11 +59,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
5959

6060
private let backgroundLogger: Logger
6161
private var logger: Logger
62-
private let eventLoop: EventLoop
62+
private let eventLoop: CurrentEventLoop
6363
private let connectionIdLoggerMetadata: Logger.MetadataValue
6464

6565
var onConnectionIdle: () -> Void = {}
66-
init(eventLoop: EventLoop, backgroundLogger: Logger, connectionIdLoggerMetadata: Logger.MetadataValue) {
66+
init(eventLoop: CurrentEventLoop, backgroundLogger: Logger, connectionIdLoggerMetadata: Logger.MetadataValue) {
6767
self.eventLoop = eventLoop
6868
self.backgroundLogger = backgroundLogger
6969
self.logger = backgroundLogger
@@ -279,7 +279,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
279279
case .sendRequestEnd(let writePromise, let shouldClose):
280280
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
281281
// We need to defer succeeding the old request to avoid ordering issues
282-
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
282+
writePromise.futureResult.hop(to: context.currentEventLoop).whenComplete { result in
283283
switch result {
284284
case .success:
285285
// If our final action was `sendRequestEnd`, that means we've already received
@@ -438,7 +438,7 @@ extension HTTP1ClientChannelHandler: Sendable {}
438438

439439
extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
440440
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
441-
if self.eventLoop.inEventLoop {
441+
if self.eventLoop.wrapped.inEventLoop {
442442
self.writeRequestBodyPart0(data, request: request, promise: promise)
443443
} else {
444444
self.eventLoop.execute {
@@ -448,7 +448,7 @@ extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
448448
}
449449

450450
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
451-
if self.eventLoop.inEventLoop {
451+
if self.eventLoop.wrapped.inEventLoop {
452452
self.finishRequestBodyStream0(request, promise: promise)
453453
} else {
454454
self.eventLoop.execute {
@@ -458,7 +458,7 @@ extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
458458
}
459459

460460
func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
461-
if self.eventLoop.inEventLoop {
461+
if self.eventLoop.wrapped.inEventLoop {
462462
self.demandResponseBodyStream0(request)
463463
} else {
464464
self.eventLoop.execute {
@@ -468,7 +468,7 @@ extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
468468
}
469469

470470
func cancelRequest(_ request: HTTPExecutableRequest) {
471-
if self.eventLoop.inEventLoop {
471+
if self.eventLoop.wrapped.inEventLoop {
472472
self.cancelRequest0(request)
473473
} else {
474474
self.eventLoop.execute {

Sources/AsyncHTTPClient/ConnectionPool/HTTP1/HTTP1Connection.swift

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import NIOCore
1717
import NIOHTTP1
1818
import NIOHTTPCompression
1919

20-
protocol HTTP1ConnectionDelegate {
20+
protocol HTTP1ConnectionDelegate: Sendable {
2121
func http1ConnectionReleased(_: HTTP1Connection)
2222
func http1ConnectionClosed(_: HTTP1Connection)
2323
}
@@ -109,7 +109,7 @@ final class HTTP1Connection {
109109
}
110110

111111
self.state = .active
112-
self.channel.closeFuture.whenComplete { _ in
112+
self.channel.closeFuture.iKnowIAmOnTheEventLoopOfThisFuture().whenComplete { _ in
113113
self.state = .closed
114114
self.delegate.http1ConnectionClosed(self)
115115
}
@@ -133,7 +133,7 @@ final class HTTP1Connection {
133133
}
134134

135135
let channelHandler = HTTP1ClientChannelHandler(
136-
eventLoop: channel.eventLoop,
136+
eventLoop: channel.eventLoop.iKnowIAmOnThisEventLoop(),
137137
backgroundLogger: logger,
138138
connectionIdLoggerMetadata: "\(self.id)"
139139
)

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2ClientRequestHandler.swift

+9-9
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,11 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
2222
typealias OutboundOut = HTTPClientRequestPart
2323
typealias InboundIn = HTTPClientResponsePart
2424

25-
private let eventLoop: EventLoop
25+
private let eventLoop: CurrentEventLoop
2626

2727
private var state: HTTPRequestStateMachine = .init(isChannelWritable: false) {
2828
willSet {
29-
self.eventLoop.assertInEventLoop()
29+
self.eventLoop.wrapped.assertInEventLoop()
3030
}
3131
}
3232

@@ -44,14 +44,14 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
4444
}
4545

4646
private var idleReadTimeoutStateMachine: IdleReadStateMachine?
47-
private var idleReadTimeoutTimer: Scheduled<Void>?
47+
private var idleReadTimeoutTimer: ScheduledOnCurrentEventLoop<Void>?
4848

49-
init(eventLoop: EventLoop) {
49+
init(eventLoop: CurrentEventLoop) {
5050
self.eventLoop = eventLoop
5151
}
5252

5353
func handlerAdded(context: ChannelHandlerContext) {
54-
assert(context.eventLoop === self.eventLoop,
54+
assert(context.eventLoop === self.eventLoop.wrapped,
5555
"The handler must be added to a channel that runs on the eventLoop it was initialized with.")
5656
self.channelContext = context
5757

@@ -345,7 +345,7 @@ final class HTTP2ClientRequestHandler: ChannelDuplexHandler {
345345

346346
extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
347347
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
348-
if self.eventLoop.inEventLoop {
348+
if self.eventLoop.wrapped.inEventLoop {
349349
self.writeRequestBodyPart0(data, request: request, promise: promise)
350350
} else {
351351
self.eventLoop.execute {
@@ -355,7 +355,7 @@ extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
355355
}
356356

357357
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
358-
if self.eventLoop.inEventLoop {
358+
if self.eventLoop.wrapped.inEventLoop {
359359
self.finishRequestBodyStream0(request, promise: promise)
360360
} else {
361361
self.eventLoop.execute {
@@ -365,7 +365,7 @@ extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
365365
}
366366

367367
func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
368-
if self.eventLoop.inEventLoop {
368+
if self.eventLoop.wrapped.inEventLoop {
369369
self.demandResponseBodyStream0(request)
370370
} else {
371371
self.eventLoop.execute {
@@ -375,7 +375,7 @@ extension HTTP2ClientRequestHandler: HTTPRequestExecutor {
375375
}
376376

377377
func cancelRequest(_ request: HTTPExecutableRequest) {
378-
if self.eventLoop.inEventLoop {
378+
if self.eventLoop.wrapped.inEventLoop {
379379
self.cancelRequest0(request)
380380
} else {
381381
self.eventLoop.execute {

Sources/AsyncHTTPClient/ConnectionPool/HTTP2/HTTP2Connection.swift

+10-10
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import NIOCore
1717
import NIOHTTP2
1818
import NIOHTTPCompression
1919

20-
protocol HTTP2ConnectionDelegate {
20+
protocol HTTP2ConnectionDelegate: Sendable {
2121
func http2Connection(_: HTTP2Connection, newMaxStreamSetting: Int)
2222
func http2ConnectionStreamClosed(_: HTTP2Connection, availableStreams: Int)
2323
func http2ConnectionGoAwayReceived(_: HTTP2Connection)
@@ -38,7 +38,7 @@ final class HTTP2Connection {
3838

3939
enum State {
4040
case initialized
41-
case starting(EventLoopPromise<Int>)
41+
case starting(CurrentEventLoopPromise<Int>)
4242
case active(maxStreams: Int)
4343
case closing
4444
case closed
@@ -134,7 +134,7 @@ final class HTTP2Connection {
134134
delegate: delegate,
135135
logger: logger
136136
)
137-
return connection._start0().map { maxStreams in (connection, maxStreams) }
137+
return connection._start0().map { maxStreams in (connection, maxStreams) }.wrapped
138138
}
139139

140140
func executeRequest(_ request: HTTPExecutableRequest) {
@@ -169,13 +169,13 @@ final class HTTP2Connection {
169169
return promise.futureResult
170170
}
171171

172-
func _start0() -> EventLoopFuture<Int> {
172+
func _start0() -> CurrentEventLoopFuture<Int> {
173173
self.channel.eventLoop.assertInEventLoop()
174174

175-
let readyToAcceptConnectionsPromise = self.channel.eventLoop.makePromise(of: Int.self)
175+
let readyToAcceptConnectionsPromise = self.channel.eventLoop.iKnowIAmOnThisEventLoop().makePromise(of: Int.self)
176176

177177
self.state = .starting(readyToAcceptConnectionsPromise)
178-
self.channel.closeFuture.whenComplete { _ in
178+
self.channel.closeFuture.iKnowIAmOnTheEventLoopOfThisFuture().whenComplete { _ in
179179
switch self.state {
180180
case .initialized, .closed:
181181
preconditionFailure("invalid state \(self.state)")
@@ -218,8 +218,8 @@ final class HTTP2Connection {
218218
preconditionFailure("Invalid state: \(self.state). Sending requests is not allowed before we are started.")
219219

220220
case .active:
221-
let createStreamChannelPromise = self.channel.eventLoop.makePromise(of: Channel.self)
222-
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise) { channel -> EventLoopFuture<Void> in
221+
let createStreamChannelPromise = self.channel.eventLoop.iKnowIAmOnThisEventLoop().makePromise(of: Channel.self)
222+
self.multiplexer.createStreamChannel(promise: createStreamChannelPromise.wrapped) { channel -> EventLoopFuture<Void> in
223223
do {
224224
// the connection may have been asked to shutdown while we created the child. in
225225
// this
@@ -238,15 +238,15 @@ final class HTTP2Connection {
238238
try channel.pipeline.syncOperations.addHandler(decompressHandler)
239239
}
240240

241-
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop)
241+
let handler = HTTP2ClientRequestHandler(eventLoop: channel.eventLoop.iKnowIAmOnThisEventLoop())
242242
try channel.pipeline.syncOperations.addHandler(handler)
243243

244244
// We must add the new channel to the list of open channels BEFORE we write the
245245
// request to it. In case of an error, we are sure that the channel was added
246246
// before.
247247
let box = ChannelBox(channel)
248248
self.openStreams.insert(box)
249-
channel.closeFuture.whenComplete { _ in
249+
channel.closeFuture.iKnowIAmOnTheEventLoopOfThisFuture().whenComplete { _ in
250250
self.openStreams.remove(box)
251251
}
252252

0 commit comments

Comments
 (0)