Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Draft] Use CurrentEventLoop proposal #704

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ let package = Package(
.product(name: "Logging", package: "swift-log"),
.product(name: "Atomics", package: "swift-atomics"),
]
, swiftSettings: [.unsafeFlags(["-Xfrontend", "-strict-concurrency=complete"])]
),
.testTarget(
name: "AsyncHTTPClientTests",
Expand Down
39 changes: 0 additions & 39 deletions Sources/AsyncHTTPClient/AsyncAwait/HTTPClientRequest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,6 @@ extension HTTPClientRequest.Body {
@preconcurrency
public static func bytes<Bytes: RandomAccessCollection & Sendable>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes)
}

@inlinable
static func _bytes<Bytes: RandomAccessCollection>(
_ bytes: Bytes
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: .known(bytes.count),
Expand Down Expand Up @@ -145,14 +138,6 @@ extension HTTPClientRequest.Body {
public static func bytes<Bytes: Sequence & Sendable>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes, length: length)
}

@inlinable
static func _bytes<Bytes: Sequence>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: length.storage,
Expand Down Expand Up @@ -185,14 +170,6 @@ extension HTTPClientRequest.Body {
public static func bytes<Bytes: Collection & Sendable>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._bytes(bytes, length: length)
}

@inlinable
static func _bytes<Bytes: Collection>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
self.init(.sequence(
length: length.storage,
Expand Down Expand Up @@ -223,14 +200,6 @@ extension HTTPClientRequest.Body {
public static func stream<SequenceOfBytes: AsyncSequence & Sendable>(
_ sequenceOfBytes: SequenceOfBytes,
length: Length
) -> Self where SequenceOfBytes.Element == ByteBuffer {
Self._stream(sequenceOfBytes, length: length)
}

@inlinable
static func _stream<SequenceOfBytes: AsyncSequence>(
_ sequenceOfBytes: SequenceOfBytes,
length: Length
) -> Self where SequenceOfBytes.Element == ByteBuffer {
let body = self.init(.asyncSequence(length: length.storage) {
var iterator = sequenceOfBytes.makeAsyncIterator()
Expand Down Expand Up @@ -259,14 +228,6 @@ extension HTTPClientRequest.Body {
public static func stream<Bytes: AsyncSequence & Sendable>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
Self._stream(bytes, length: length)
}

@inlinable
static func _stream<Bytes: AsyncSequence>(
_ bytes: Bytes,
length: Length
) -> Self where Bytes.Element == UInt8 {
let body = self.init(.asyncSequence(length: length.storage) {
var iterator = bytes.makeAsyncIterator()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
// transitions to `.connectSent` or `.failed`
case initialized
// transitions to `.headReceived` or `.failed`
case connectSent(Scheduled<Void>)
case connectSent(ScheduledOnCurrentEventLoop<Void>)
// transitions to `.completed` or `.failed`
case headReceived(Scheduled<Void>)
case headReceived(ScheduledOnCurrentEventLoop<Void>)
// final error state
case failed(Error)
// final success state
Expand All @@ -40,8 +40,8 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
private let proxyAuthorization: HTTPClient.Authorization?
private let deadline: NIODeadline

private var proxyEstablishedPromise: EventLoopPromise<Void>?
var proxyEstablishedFuture: EventLoopFuture<Void>? {
private var proxyEstablishedPromise: CurrentEventLoopPromise<Void>?
var proxyEstablishedFuture: CurrentEventLoopFuture<Void>? {
return self.proxyEstablishedPromise?.futureResult
}

Expand Down Expand Up @@ -81,7 +81,7 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
}

func handlerAdded(context: ChannelHandlerContext) {
self.proxyEstablishedPromise = context.eventLoop.makePromise(of: Void.self)
self.proxyEstablishedPromise = context.currentEventLoop.makePromise(of: Void.self)

self.sendConnect(context: context)
}
Expand Down Expand Up @@ -135,7 +135,7 @@ final class HTTP1ProxyConnectHandler: ChannelDuplexHandler, RemovableChannelHand
return
}

let timeout = context.eventLoop.scheduleTask(deadline: self.deadline) {
let timeout = context.currentEventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized:
preconditionFailure("How can we have a scheduled timeout, if the connection is not even up?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
// transitions to channelActive or failed
case initialized
// transitions to socksEstablished or failed
case channelActive(Scheduled<Void>)
case channelActive(ScheduledOnCurrentEventLoop<Void>)
// final success state
case socksEstablished
// final success state
case failed(Error)
}

private var socksEstablishedPromise: EventLoopPromise<Void>?
var socksEstablishedFuture: EventLoopFuture<Void>? {
private var socksEstablishedPromise: CurrentEventLoopPromise<Void>?
var socksEstablishedFuture: CurrentEventLoopFuture<Void>? {
return self.socksEstablishedPromise?.futureResult
}

Expand All @@ -42,7 +42,7 @@ final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
}

func handlerAdded(context: ChannelHandlerContext) {
self.socksEstablishedPromise = context.eventLoop.makePromise(of: Void.self)
self.socksEstablishedPromise = context.currentEventLoop.makePromise(of: Void.self)

if context.channel.isActive {
self.connectionStarted(context: context)
Expand Down Expand Up @@ -99,7 +99,7 @@ final class SOCKSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
return
}

let scheduled = context.eventLoop.scheduleTask(deadline: self.deadline) {
let scheduled = context.currentEventLoop.scheduleTask(deadline: self.deadline) {
switch self.state {
case .initialized, .channelActive:
// close the connection, if the handshake timed out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
// transitions to channelActive or failed
case initialized
// transitions to tlsEstablished or failed
case channelActive(Scheduled<Void>?)
case channelActive(ScheduledOnCurrentEventLoop<Void>?)
// final success state
case tlsEstablished
// final success state
case failed(Error)
}

private var tlsEstablishedPromise: EventLoopPromise<String?>?
var tlsEstablishedFuture: EventLoopFuture<String?>? {
private var tlsEstablishedPromise: CurrentEventLoopPromise<String?>?
var tlsEstablishedFuture: CurrentEventLoopFuture<String?>? {
return self.tlsEstablishedPromise?.futureResult
}

Expand All @@ -42,7 +42,7 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
}

func handlerAdded(context: ChannelHandlerContext) {
self.tlsEstablishedPromise = context.eventLoop.makePromise(of: String?.self)
self.tlsEstablishedPromise = context.currentEventLoop.makePromise(of: String?.self)

if context.channel.isActive {
self.connectionStarted(context: context)
Expand Down Expand Up @@ -102,9 +102,9 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
return
}

var scheduled: Scheduled<Void>?
var scheduled: ScheduledOnCurrentEventLoop<Void>?
if let deadline = deadline {
scheduled = context.eventLoop.scheduleTask(deadline: deadline) {
scheduled = context.currentEventLoop.scheduleTask(deadline: deadline) {
switch self.state {
case .initialized, .channelActive:
// close the connection, if the handshake timed out
Expand All @@ -121,3 +121,6 @@ final class TLSEventsHandler: ChannelInboundHandler, RemovableChannelHandler {
self.state = .channelActive(scheduled)
}
}

@available(*, unavailable)
extension TLSEventsHandler: Sendable {}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

private var state: HTTP1ConnectionStateMachine = .init() {
didSet {
self.eventLoop.assertInEventLoop()
self.eventLoop.wrapped.assertInEventLoop()
}
}

Expand All @@ -50,7 +50,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
}

private var idleReadTimeoutStateMachine: IdleReadStateMachine?
private var idleReadTimeoutTimer: Scheduled<Void>?
private var idleReadTimeoutTimer: ScheduledOnCurrentEventLoop<Void>?

/// Cancelling a task in NIO does *not* guarantee that the task will not execute under certain race conditions.
/// We therefore give each timer an ID and increase the ID every time we reset or cancel it.
Expand All @@ -59,11 +59,11 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {

private let backgroundLogger: Logger
private var logger: Logger
private let eventLoop: EventLoop
private let eventLoop: CurrentEventLoop
private let connectionIdLoggerMetadata: Logger.MetadataValue

var onConnectionIdle: () -> Void = {}
init(eventLoop: EventLoop, backgroundLogger: Logger, connectionIdLoggerMetadata: Logger.MetadataValue) {
init(eventLoop: CurrentEventLoop, backgroundLogger: Logger, connectionIdLoggerMetadata: Logger.MetadataValue) {
self.eventLoop = eventLoop
self.backgroundLogger = backgroundLogger
self.logger = backgroundLogger
Expand Down Expand Up @@ -150,7 +150,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
self.request = req

self.logger.debug("Request was scheduled on connection")
req.willExecuteRequest(self)
req.willExecuteRequest(HTTP1ClientChannelHandler.Executor(self))

let action = self.state.runNewRequest(
head: req.requestHead,
Expand Down Expand Up @@ -279,7 +279,7 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
case .sendRequestEnd(let writePromise, let shouldClose):
let writePromise = writePromise ?? context.eventLoop.makePromise(of: Void.self)
// We need to defer succeeding the old request to avoid ordering issues
writePromise.futureResult.hop(to: context.eventLoop).whenComplete { result in
writePromise.futureResult.hop(to: context.currentEventLoop).whenComplete { result in
switch result {
case .success:
// If our final action was `sendRequestEnd`, that means we've already received
Expand Down Expand Up @@ -436,43 +436,53 @@ final class HTTP1ClientChannelHandler: ChannelDuplexHandler {
@available(*, unavailable)
extension HTTP1ClientChannelHandler: Sendable {}

extension HTTP1ClientChannelHandler: HTTPRequestExecutor {
func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.writeRequestBodyPart0(data, request: request, promise: promise)
} else {
self.eventLoop.execute {
self.writeRequestBodyPart0(data, request: request, promise: promise)
extension HTTP1ClientChannelHandler {
struct Executor: HTTPRequestExecutor, @unchecked Sendable {
private let handler: HTTP1ClientChannelHandler
private let eventLoop: EventLoop

init(_ handler: HTTP1ClientChannelHandler) {
self.eventLoop = handler.eventLoop.wrapped
self.handler = handler
}

func writeRequestBodyPart(_ data: IOData, request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.handler.writeRequestBodyPart0(data, request: request, promise: promise)
} else {
self.eventLoop.execute {
self.handler.writeRequestBodyPart0(data, request: request, promise: promise)
}
}
}
}

func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.finishRequestBodyStream0(request, promise: promise)
} else {
self.eventLoop.execute {
self.finishRequestBodyStream0(request, promise: promise)
func finishRequestBodyStream(_ request: HTTPExecutableRequest, promise: EventLoopPromise<Void>?) {
if self.eventLoop.inEventLoop {
self.handler.finishRequestBodyStream0(request, promise: promise)
} else {
self.eventLoop.execute {
self.handler.finishRequestBodyStream0(request, promise: promise)
}
}
}
}

func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.demandResponseBodyStream0(request)
} else {
self.eventLoop.execute {
self.demandResponseBodyStream0(request)
func demandResponseBodyStream(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.handler.demandResponseBodyStream0(request)
} else {
self.eventLoop.execute {
self.handler.demandResponseBodyStream0(request)
}
}
}
}

func cancelRequest(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.cancelRequest0(request)
} else {
self.eventLoop.execute {
self.cancelRequest0(request)
func cancelRequest(_ request: HTTPExecutableRequest) {
if self.eventLoop.inEventLoop {
self.handler.cancelRequest0(request)
} else {
self.eventLoop.execute {
self.handler.cancelRequest0(request)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import NIOCore
import NIOHTTP1
import NIOHTTPCompression

protocol HTTP1ConnectionDelegate {
protocol HTTP1ConnectionDelegate: Sendable {
func http1ConnectionReleased(_: HTTP1Connection)
func http1ConnectionClosed(_: HTTP1Connection)
}
Expand Down Expand Up @@ -69,8 +69,9 @@ final class HTTP1Connection {
if self.channel.eventLoop.inEventLoop {
self.execute0(request: request)
} else {
let sendableSelf = UnsafeTransfer(self)
self.channel.eventLoop.execute {
self.execute0(request: request)
sendableSelf.wrappedValue.execute0(request: request)
}
}
}
Expand Down Expand Up @@ -109,7 +110,7 @@ final class HTTP1Connection {
}

self.state = .active
self.channel.closeFuture.whenComplete { _ in
self.channel.closeFuture.iKnowIAmOnTheEventLoopOfThisFuture().whenComplete { _ in
self.state = .closed
self.delegate.http1ConnectionClosed(self)
}
Expand All @@ -133,7 +134,7 @@ final class HTTP1Connection {
}

let channelHandler = HTTP1ClientChannelHandler(
eventLoop: channel.eventLoop,
eventLoop: channel.eventLoop.iKnowIAmOnThisEventLoop(),
backgroundLogger: logger,
connectionIdLoggerMetadata: "\(self.id)"
)
Expand Down
Loading