diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 34e97d0f8d..fefda6f624 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -35,6 +35,43 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public final class NIOAsyncChannel: Sendable { + @_spi(AsyncChannel) + public struct Configuration { + /// The backpressure strategy of the ``NIOAsyncChannel/inboundStream``. + public var backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark + + /// If outbound half closure should be enabled. Outbound half closure is triggered once + /// the ``NIOAsyncChannelWriter`` is either finished or deinitialized. + public var isOutboundHalfClosureEnabled: Bool + + /// The ``NIOAsyncChannel/inboundStream`` message's type. + public var inboundType: Inbound.Type + + /// The ``NIOAsyncChannel/outboundWriter`` message's type. + public var outboundType: Outbound.Type + + /// Initializes a new ``NIOAsyncChannel/Configuration``. + /// + /// - Parameters: + /// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel/inboundStream``. Defaults + /// to a watermarked strategy (lowWatermark: 2, highWatermark: 10). + /// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once + /// the ``NIOAsyncChannelWriter`` is either finished or deinitialized. Defaults to `false`. + /// - inboundType: The ``NIOAsyncChannel/inboundStream`` message's type. + /// - outboundType: The ``NIOAsyncChannel/outboundWriter`` message's type. + public init( + backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark = .init(lowWatermark: 2, highWatermark: 10), + isOutboundHalfClosureEnabled: Bool = false, + inboundType: Inbound.Type = Inbound.self, + outboundType: Outbound.Type = Outbound.self + ) { + self.backpressureStrategy = backpressureStrategy + self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled + self.inboundType = inboundType + self.outboundType = outboundType + } + } + /// The underlying channel being wrapped by this ``NIOAsyncChannel``. @_spi(AsyncChannel) public let channel: Channel @@ -52,25 +89,18 @@ public final class NIOAsyncChannel: Senda /// /// - Parameters: /// - channel: The ``Channel`` to wrap. - /// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel/inboundStream``. - /// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once - /// the ``NIOAsyncChannelWriter`` is either finished or deinitialized. - /// - inboundType: The ``NIOAsyncChannel/inboundStream`` message's type. - /// - outboundType: The ``NIOAsyncChannel/outboundWriter`` message's type. + /// - configuration: The ``NIOAsyncChannel``s configuration. @inlinable @_spi(AsyncChannel) public init( synchronouslyWrapping channel: Channel, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + configuration: Configuration = .init() ) throws { channel.eventLoop.preconditionInEventLoop() self.channel = channel (self.inboundStream, self.outboundWriter) = try channel._syncAddAsyncHandlers( - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + backpressureStrategy: configuration.backpressureStrategy, + isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled ) } @@ -83,23 +113,18 @@ public final class NIOAsyncChannel: Senda /// /// - Parameters: /// - channel: The ``Channel`` to wrap. - /// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel/inboundStream``. - /// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once - /// the ``NIOAsyncChannelWriter`` is either finished or deinitialized. - /// - inboundType: The ``NIOAsyncChannel/inboundStream`` message's type. + /// - configuration: The ``NIOAsyncChannel``s configuration. @inlinable @_spi(AsyncChannel) public init( synchronouslyWrapping channel: Channel, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self + configuration: Configuration ) throws where Outbound == Never { channel.eventLoop.preconditionInEventLoop() self.channel = channel (self.inboundStream, self.outboundWriter) = try channel._syncAddAsyncHandlers( - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + backpressureStrategy: configuration.backpressureStrategy, + isOutboundHalfClosureEnabled: configuration.isOutboundHalfClosureEnabled ) self.outboundWriter.finish() @@ -125,7 +150,7 @@ public final class NIOAsyncChannel: Senda backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, isOutboundHalfClosureEnabled: Bool = false, channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture, - postFireChannelReadTransformation: @Sendable @escaping (ChannelReadResult) -> EventLoopFuture + postFireChannelReadTransformation: @Sendable @escaping (ChannelReadResult) -> EventLoopFuture ) throws -> NIOAsyncChannel where Outbound == Never { channel.eventLoop.preconditionInEventLoop() let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream, NIOAsyncChannelOutboundWriter) = try channel._syncAddAsyncHandlersWithTransformations( diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift index 970acf2b81..f2fa77f8a0 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift @@ -111,7 +111,7 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler EventLoopFuture, + channelReadTransformation: @Sendable @escaping (InboundIn) -> EventLoopFuture, postFireChannelReadTransformation: @Sendable @escaping (ReadTransformationResult) -> EventLoopFuture ) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == Channel { return .init( diff --git a/Sources/NIOPosix/Bootstrap.swift b/Sources/NIOPosix/Bootstrap.swift index b7f5f9542b..f0a9f3e428 100644 --- a/Sources/NIOPosix/Bootstrap.swift +++ b/Sources/NIOPosix/Bootstrap.swift @@ -491,11 +491,7 @@ extension ServerBootstrap { /// - host: The host to bind on. /// - port: The port to bind on. /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels. - /// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled - /// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure. - /// - childChannelInboundType: The child channel's inbound type. - /// - childChannelOutboundType: The child channel's outbound type. + /// - childChannelConfiguration: The child channel's async channel configuration. /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) @@ -503,10 +499,7 @@ extension ServerBootstrap { host: String, port: Int, serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isChildChannelOutboundHalfClosureEnabled: Bool = false, - childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self, - childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self + childChannelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel, Never> { return try await self.bind( host: host, @@ -516,10 +509,7 @@ extension ServerBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: childChannelBackpressureStrategy, - isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled, - inboundType: childChannelInboundType, - outboundType: childChannelOutboundType + configuration: childChannelConfiguration ) } } @@ -530,21 +520,14 @@ extension ServerBootstrap { /// - Parameters: /// - address: The `SocketAddress` to bind on. /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels. - /// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled - /// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure. - /// - childChannelInboundType: The child channel's inbound type. - /// - childChannelOutboundType: The child channel's outbound type. + /// - childChannelConfiguration: The child channel's async channel configuration. /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func bind( to address: SocketAddress, serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isChildChannelOutboundHalfClosureEnabled: Bool = false, - childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self, - childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self + childChannelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel, Never> { return try await self.bind( to: address, @@ -553,10 +536,7 @@ extension ServerBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: childChannelBackpressureStrategy, - isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled, - inboundType: childChannelInboundType, - outboundType: childChannelOutboundType + configuration: childChannelConfiguration ) } } @@ -569,11 +549,7 @@ extension ServerBootstrap { /// unless `cleanupExistingSocketFile`is set to `true`. /// - cleanupExistingSocketFile: Whether to cleanup an existing socket file at `unixDomainSocketPath`. /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels. - /// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled - /// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure. - /// - childChannelInboundType: The child channel's inbound type. - /// - childChannelOutboundType: The child channel's outbound type. + /// - childChannelConfiguration: The child channel's async channel configuration. /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) @@ -581,10 +557,7 @@ extension ServerBootstrap { unixDomainSocketPath: String, cleanupExistingSocketFile: Bool = false, serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isChildChannelOutboundHalfClosureEnabled: Bool = false, - childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self, - childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self + childChannelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel, Never> { return try await self.bind( unixDomainSocketPath: unixDomainSocketPath, @@ -594,10 +567,7 @@ extension ServerBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: childChannelBackpressureStrategy, - isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled, - inboundType: childChannelInboundType, - outboundType: childChannelOutboundType + configuration: childChannelConfiguration ) } } @@ -608,21 +578,14 @@ extension ServerBootstrap { /// - Parameters: /// - socket: The _Unix file descriptor_ representing the bound stream socket. /// - serverBackpressureStrategy: The back pressure strategy used by the server socket channel. - /// - childChannelBackpressureStrategy: The back pressure strategy used by the child channels. - /// - isChildChannelOutboundHalfClosureEnabled: Indicates if half closure is enabled on the child channels. If half closure is enabled - /// then finishing the ``NIOAsyncChannelWriter`` will lead to half closure. - /// - childChannelInboundType: The child channel's inbound type. - /// - childChannelOutboundType: The child channel's outbound type. + /// - childChannelConfiguration: The child channel's async channel configuration. /// - Returns: A ``NIOAsyncChannel`` of connection ``NIOAsyncChannel``s. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func withBoundSocket( _ socket: NIOBSDSocket.Handle, serverBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - childChannelBackpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isChildChannelOutboundHalfClosureEnabled: Bool = false, - childChannelInboundType: ChildChannelInbound.Type = ChildChannelInbound.self, - childChannelOutboundType: ChildChannelOutbound.Type = ChildChannelOutbound.self + childChannelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel, Never> { return try await self.bind( socket, @@ -631,10 +594,7 @@ extension ServerBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: childChannelBackpressureStrategy, - isOutboundHalfClosureEnabled: isChildChannelOutboundHalfClosureEnabled, - inboundType: childChannelInboundType, - outboundType: childChannelOutboundType + configuration: childChannelConfiguration ) } } @@ -1413,21 +1373,14 @@ extension ClientBootstrap { /// - Parameters: /// - host: The host to connect to. /// - port: The port to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( host: String, port: Int, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( host: host, @@ -1436,10 +1389,7 @@ extension ClientBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -1449,20 +1399,13 @@ extension ClientBootstrap { /// /// - Parameters: /// - address: The address to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( to address: SocketAddress, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( to: address @@ -1470,10 +1413,7 @@ extension ClientBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -1483,20 +1423,13 @@ extension ClientBootstrap { /// /// - Parameters: /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( unixDomainSocketPath: String, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( unixDomainSocketPath: unixDomainSocketPath @@ -1504,10 +1437,7 @@ extension ClientBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -1517,20 +1447,13 @@ extension ClientBootstrap { /// /// - Parameters: /// - descriptor: The _Unix file descriptor_ representing the connected stream socket. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( _ socket: NIOBSDSocket.Handle, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.withConnectedSocket( socket @@ -1538,10 +1461,7 @@ extension ClientBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2493,20 +2413,13 @@ extension DatagramBootstrap { /// /// - Parameters: /// - socket: The _Unix file descriptor_ representing the bound stream socket. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func withBoundSocket( _ socket: NIOBSDSocket.Handle, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { func makeChannel(_ eventLoop: SelectableEventLoop) throws -> DatagramChannel { return try DatagramChannel(eventLoop: eventLoop, socket: socket) @@ -2517,10 +2430,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } }, @@ -2540,21 +2450,14 @@ extension DatagramBootstrap { /// - Parameters: /// - host: The host to bind on. /// - port: The port to bind on. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func bind( host: String, port: Int, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.bind( host: host, @@ -2563,10 +2466,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2577,20 +2477,13 @@ extension DatagramBootstrap { /// /// - Parameters: /// - address: The `SocketAddress` to bind on. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func bind( to address: SocketAddress, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.bind( to: address, @@ -2598,10 +2491,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2614,21 +2504,14 @@ extension DatagramBootstrap { /// - unixDomainSocketPath: The path of the UNIX Domain Socket to bind on. The`unixDomainSocketPath` must not exist, /// unless `cleanupExistingSocketFile`is set to `true`. /// - cleanupExistingSocketFile: Whether to cleanup an existing socket file at `unixDomainSocketPath`. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func bind( unixDomainSocketPath: String, cleanupExistingSocketFile: Bool = false, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.bind( unixDomainSocketPath: unixDomainSocketPath, @@ -2637,10 +2520,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2652,21 +2532,14 @@ extension DatagramBootstrap { /// - Parameters: /// - host: The host to connect to. /// - port: The port to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( host: String, port: Int, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( host: host, @@ -2675,10 +2548,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2688,20 +2558,13 @@ extension DatagramBootstrap { /// /// - Parameters: /// - address: The address to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( to address: SocketAddress, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( to: address @@ -2709,10 +2572,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -2722,20 +2582,13 @@ extension DatagramBootstrap { /// /// - Parameters: /// - unixDomainSocketPath: The _Unix domain socket_ path to connect to. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the established connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( unixDomainSocketPath: String, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { return try await self.connect( unixDomainSocketPath: unixDomainSocketPath @@ -2743,10 +2596,7 @@ extension DatagramBootstrap { channel.eventLoop.makeCompletedFuture { return try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -3193,20 +3043,13 @@ extension NIOPipeBootstrap { /// /// - Parameters: /// - inputOutput: The _Unix file descriptor_ for the input & output. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func takingOwnershipOfDescriptor( inputOutput: CInt, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { try await self.takingOwnershipOfDescriptor( inputOutput: inputOutput @@ -3214,10 +3057,7 @@ extension NIOPipeBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } @@ -3238,21 +3078,14 @@ extension NIOPipeBootstrap { /// - Parameters: /// - input: The _Unix file descriptor_ for the input (ie. the read side). /// - output: The _Unix file descriptor_ for the output (ie. the write side). - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func takingOwnershipOfDescriptors( input: CInt, output: CInt, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { try await self.takingOwnershipOfDescriptors( input: input, @@ -3261,10 +3094,7 @@ extension NIOPipeBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } } diff --git a/Sources/NIOPosix/RawSocketBootstrap.swift b/Sources/NIOPosix/RawSocketBootstrap.swift index df775b532d..d9d4fc9e8d 100644 --- a/Sources/NIOPosix/RawSocketBootstrap.swift +++ b/Sources/NIOPosix/RawSocketBootstrap.swift @@ -200,21 +200,14 @@ extension NIORawSocketBootstrap { /// - Parameters: /// - host: The host to bind on. /// - ipProtocol: The IP protocol used in the IP protocol/nextHeader field. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func bind( host: String, ipProtocol: NIOIPProtocol, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { try await self.bind0( host: host, @@ -223,10 +216,7 @@ extension NIORawSocketBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } }, @@ -239,21 +229,14 @@ extension NIORawSocketBootstrap { /// - Parameters: /// - host: The host to connect to. /// - ipProtocol: The IP protocol used in the IP protocol/nextHeader field. - /// - backpressureStrategy: The back pressure strategy used by the channel. - /// - isOutboundHalfClosureEnabled: Indicates if half closure is enabled on the channel. If half closure is enabled - /// then finishing the `NIOAsyncChannelWriter` will lead to half closure. - /// - inboundType: The channel's inbound type. - /// - outboundType: The channel's outbound type. + /// - channelConfiguration: The channel's async channel configuration. /// - Returns: A `NIOAsyncChannel` for the bound connection. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @_spi(AsyncChannel) public func connect( host: String, ipProtocol: NIOIPProtocol, - backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil, - isOutboundHalfClosureEnabled: Bool = false, - inboundType: Inbound.Type = Inbound.self, - outboundType: Outbound.Type = Outbound.self + channelConfiguration: NIOAsyncChannel.Configuration = .init() ) async throws -> NIOAsyncChannel { try await self.connect0( host: host, @@ -262,10 +245,7 @@ extension NIORawSocketBootstrap { channel.eventLoop.makeCompletedFuture { try NIOAsyncChannel( synchronouslyWrapping: channel, - backpressureStrategy: backpressureStrategy, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, - inboundType: inboundType, - outboundType: outboundType + configuration: channelConfiguration ) } }, diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 0a632f98ba..c6c0b6a661 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -22,7 +22,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: String.self, outboundType: Never.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } var iterator = wrapped.inboundStream.makeAsyncIterator() @@ -48,7 +48,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: Never.self, outboundType: String.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } try await wrapped.outboundWriter.write("hello") @@ -75,9 +75,11 @@ final class AsyncChannelTests: XCTestCase { let wrapped = try await channel.testingEventLoop.executeInContext { try NIOAsyncChannel( synchronouslyWrapping: channel, - isOutboundHalfClosureEnabled: true, - inboundType: Never.self, - outboundType: Never.self + configuration: .init( + isOutboundHalfClosureEnabled: true, + inboundType: Never.self, + outboundType: Never.self + ) ) } inboundReader = wrapped.inboundStream @@ -87,6 +89,8 @@ final class AsyncChannelTests: XCTestCase { } } + await channel.testingEventLoop.run() + try await channel.testingEventLoop.executeInContext { XCTAssertEqual(1, closeRecorder.outboundCloses) } @@ -106,7 +110,14 @@ final class AsyncChannelTests: XCTestCase { do { let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, isOutboundHalfClosureEnabled: false, inboundType: Never.self, outboundType: Never.self) + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + isOutboundHalfClosureEnabled: false, + inboundType: Never.self, + outboundType: Never.self + ) + ) } inboundReader = wrapped.inboundStream @@ -138,9 +149,11 @@ final class AsyncChannelTests: XCTestCase { let wrapped = try await channel.testingEventLoop.executeInContext { try NIOAsyncChannel( synchronouslyWrapping: channel, - isOutboundHalfClosureEnabled: true, - inboundType: Never.self, - outboundType: Never.self + configuration: .init( + isOutboundHalfClosureEnabled: true, + inboundType: Never.self, + outboundType: Never.self + ) ) } inboundReader = wrapped.inboundStream @@ -150,6 +163,8 @@ final class AsyncChannelTests: XCTestCase { } } + await channel.testingEventLoop.run() + // First we see half-closure. try await channel.testingEventLoop.executeInContext { XCTAssertEqual(1, closeRecorder.allCloses) @@ -176,7 +191,14 @@ final class AsyncChannelTests: XCTestCase { do { let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, isOutboundHalfClosureEnabled: false, inboundType: Never.self, outboundType: Never.self) + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + isOutboundHalfClosureEnabled: false, + inboundType: Never.self, + outboundType: Never.self + ) + ) } try await channel.testingEventLoop.executeInContext { @@ -199,7 +221,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: String.self, outboundType: Never.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } try await channel.writeInbound("hello") @@ -216,7 +238,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: String.self, outboundType: Never.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } try await channel.writeInbound("hello") @@ -237,7 +259,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: Never.self, outboundType: String.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } try await channel.testingEventLoop.executeInContext { @@ -277,7 +299,7 @@ final class AsyncChannelTests: XCTestCase { do { // Create the NIOAsyncChannel, then drop it. The handler will still be in the pipeline. _ = try await channel.testingEventLoop.executeInContext { - _ = try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: Sentinel.self, outboundType: Never.self) + _ = try NIOAsyncChannel(synchronouslyWrapping: channel) } } @@ -301,7 +323,14 @@ final class AsyncChannelTests: XCTestCase { let readCounter = ReadCounter() try await channel.pipeline.addHandler(readCounter) let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, backpressureStrategy: .init(lowWatermark: 2, highWatermark: 4), inboundType: Void.self, outboundType: Never.self) + try NIOAsyncChannel( + synchronouslyWrapping: channel, + configuration: .init( + backpressureStrategy: .init(lowWatermark: 2, highWatermark: 4), + inboundType: Void.self, + outboundType: Never.self + ) + ) } // Attempt to read. This should succeed an arbitrary number of times. @@ -405,7 +434,7 @@ final class AsyncChannelTests: XCTestCase { guard #available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *) else { return } let channel = NIOAsyncTestingChannel() let wrapped = try await channel.testingEventLoop.executeInContext { - try NIOAsyncChannel(synchronouslyWrapping: channel, inboundType: String.self, outboundType: String.self) + try NIOAsyncChannel(synchronouslyWrapping: channel) } var iterator = wrapped.inboundStream.makeAsyncIterator() diff --git a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift index b365ae50a3..9df96526d0 100644 --- a/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift +++ b/Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift @@ -225,8 +225,10 @@ final class AsyncChannelBootstrapTests: XCTestCase { .bind( host: "127.0.0.1", port: 0, - childChannelInboundType: String.self, - childChannelOutboundType: String.self + childChannelConfiguration: .init( + inboundType: String.self, + outboundType: String.self + ) ) try await withThrowingTaskGroup(of: Void.self) { group in @@ -565,7 +567,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { let channel = try await DatagramBootstrap(group: eventLoopGroup) .bind( to: .init(ipAddress: "127.0.0.1", port: 0), - channelInitializer: { channel -> EventLoopFuture in channel.eventLoop.makeSucceededFuture(channel) } + channelInitializer: { channel -> EventLoopFuture in channel.eventLoop.makeSucceededFuture(channel) } ) let port = channel.localAddress!.port! @@ -581,7 +583,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } // We need to sleep here since we can only connect the client after the server started. - try await Task.sleep(nanoseconds: 100000000) + try await Task.sleep(nanoseconds: 100_000_000) group.addTask { // We have to use a fixed port here since we only get the channel once protocol negotiation is done @@ -611,16 +613,16 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } } - + // MARK: - Pipe Bootstrap - + func testPipeBootstrap() async throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) let (pipe1ReadFH, pipe1WriteFH, pipe2ReadFH, pipe2WriteFH) = self.makePipeFileDescriptors() let toChannel = FileHandle(fileDescriptor: pipe1WriteFH, closeOnDealloc: false) let fromChannel = FileHandle(fileDescriptor: pipe2ReadFH, closeOnDealloc: false) let channel: NIOAsyncChannel - + do { channel = try await NIOPipeBootstrap(group: eventLoopGroup) .takingOwnershipOfDescriptors( @@ -631,13 +633,13 @@ final class AsyncChannelBootstrapTests: XCTestCase { [pipe1ReadFH, pipe1WriteFH, pipe2ReadFH, pipe2WriteFH].forEach { try? SystemCalls.close(descriptor: $0) } throw error } - + var inboundIterator = channel.inboundStream.makeAsyncIterator() - + do { try toChannel.writeBytes(.init(string: "Request")) try await XCTAsyncAssertEqual(try await inboundIterator.next(), ByteBuffer(string: "Request")) - + let response = ByteBuffer(string: "Response") try await channel.outboundWriter.write(response) XCTAssertEqual(try fromChannel.readBytes(ofExactLength: response.readableBytes), Array(buffer: response)) @@ -647,13 +649,13 @@ final class AsyncChannelBootstrapTests: XCTestCase { throw error } } - + func testPipeBootstrap_withProtocolNegotiation() async throws { let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) let (pipe1ReadFH, pipe1WriteFH, pipe2ReadFH, pipe2WriteFH) = self.makePipeFileDescriptors() let toChannel = FileHandle(fileDescriptor: pipe1WriteFH, closeOnDealloc: false) let fromChannel = FileHandle(fileDescriptor: pipe2ReadFH, closeOnDealloc: false) - + try await withThrowingTaskGroup(of: NegotiationResult.self) { group in group.addTask { do { @@ -671,8 +673,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { throw error } } - - + try toChannel.writeBytes(.init(string: "alpn:string\nHello\n")) let negotiationResult = try await group.next() switch negotiationResult { @@ -680,7 +681,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { var inboundIterator = channel.inboundStream.makeAsyncIterator() do { try await XCTAsyncAssertEqual(try await inboundIterator.next(), "Hello") - + let response = ByteBuffer(string: "Response") try await channel.outboundWriter.write("Response") XCTAssertEqual(try fromChannel.readBytes(ofExactLength: response.readableBytes), Array(buffer: response)) @@ -689,7 +690,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { [pipe1WriteFH, pipe2ReadFH].forEach { try? SystemCalls.close(descriptor: $0) } throw error } - + case .byte, nil: fatalError() } @@ -734,7 +735,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } // We need to sleep here since we can only connect the client after the server started. - try await Task.sleep(nanoseconds: 100000000) + try await Task.sleep(nanoseconds: 100_000_000) group.addTask { try await self.makeRawSocketClientChannelWithProtocolNegotiation( @@ -764,7 +765,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } // MARK: - Test Helpers - + private func makePipeFileDescriptors() -> (pipe1ReadFH: Int32, pipe1WriteFH: Int32, pipe2ReadFH: Int32, pipe2WriteFH: Int32) { var pipe1FDs: [Int32] = [-1, -1] pipe1FDs.withUnsafeMutableBufferPointer { ptr in @@ -855,9 +856,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } } .connect( - to: .init(ipAddress: "127.0.0.1", port: port), - inboundType: String.self, - outboundType: String.self + to: .init(ipAddress: "127.0.0.1", port: port) ) } @@ -908,9 +907,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } .bind( host: "127.0.0.1", - port: 0, - inboundType: String.self, - outboundType: String.self + port: 0 ) } @@ -943,9 +940,7 @@ final class AsyncChannelBootstrapTests: XCTestCase { } .connect( host: "127.0.0.1", - port: port, - inboundType: String.self, - outboundType: String.self + port: port ) } @@ -1026,11 +1021,8 @@ final class AsyncChannelBootstrapTests: XCTestCase { case "string": return channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(ByteBufferToStringHandler()) - let asyncChannel = try NIOAsyncChannel( - synchronouslyWrapping: channel, - isOutboundHalfClosureEnabled: true, - inboundType: String.self, - outboundType: String.self + let asyncChannel = try NIOAsyncChannel( + synchronouslyWrapping: channel ) return NIOProtocolNegotiationResult.finished(NegotiationResult.string(asyncChannel)) @@ -1039,11 +1031,8 @@ final class AsyncChannelBootstrapTests: XCTestCase { return channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(ByteBufferToByteHandler()) - let asyncChannel = try NIOAsyncChannel( - synchronouslyWrapping: channel, - isOutboundHalfClosureEnabled: true, - inboundType: UInt8.self, - outboundType: UInt8.self + let asyncChannel = try NIOAsyncChannel( + synchronouslyWrapping: channel ) return NIOProtocolNegotiationResult.finished(NegotiationResult.byte(asyncChannel))