From 9c27ae91c10eb1b47a024511ae443a3f29f5f19f Mon Sep 17 00:00:00 2001 From: dkz2 Date: Thu, 7 Dec 2023 23:02:53 -0600 Subject: [PATCH 1/3] init conform to pooled connection --- Package.swift | 2 + Sources/Memcache/MemcacheConnection.swift | 129 +++++++++++++++------- Sources/MemcacheExample/Program.swift | 4 +- 3 files changed, 92 insertions(+), 43 deletions(-) diff --git a/Package.swift b/Package.swift index 309bba4..478eca2 100644 --- a/Package.swift +++ b/Package.swift @@ -33,6 +33,7 @@ let package = Package( .package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"), .package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0"), + .package(url: "https://github.com/vapor/postgres-nio.git", from: "1.19.0"), ], targets: [ .target( @@ -43,6 +44,7 @@ let package = Package( .product(name: "NIOEmbedded", package: "swift-nio"), .product(name: "Logging", package: "swift-log"), .product(name: "ServiceLifecycle", package: "swift-service-lifecycle"), + .product(name: "_ConnectionPoolModule", package: "postgres-nio"), ] ), .testTarget( diff --git a/Sources/Memcache/MemcacheConnection.swift b/Sources/Memcache/MemcacheConnection.swift index 0ef63bd..4fe982f 100644 --- a/Sources/Memcache/MemcacheConnection.swift +++ b/Sources/Memcache/MemcacheConnection.swift @@ -11,8 +11,8 @@ // SPDX-License-Identifier: Apache-2.0 // //===----------------------------------------------------------------------===// -@_spi(AsyncChannel) +import _ConnectionPoolModule import NIOCore import NIOPosix import ServiceLifecycle @@ -20,7 +20,18 @@ import ServiceLifecycle /// An actor to create a connection to a Memcache server. /// /// This actor can be used to send commands to the server. -public actor MemcacheConnection: Service { +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +@_spi(ConnectionPool) +public actor MemcacheConnection: Service, PooledConnection { + public typealias ID = Int + public let id: ID + + private let closePromise: EventLoopPromise + + public var closeFuture: EventLoopFuture { + return self.closePromise.futureResult + } + private typealias StreamElement = (MemcacheRequest, CheckedContinuation) private let host: String private let port: Int @@ -56,23 +67,55 @@ public actor MemcacheConnection: Service { private var state: State - /// Initialize a new MemcacheConnection. + /// Initialize a new MemcacheConnection, with an option to specify an ID. + /// If no ID is provided, a default value is used. /// /// - Parameters: /// - host: The host address of the Memcache server. /// - port: The port number of the Memcache server. /// - eventLoopGroup: The event loop group to use for this connection. - public init(host: String, port: Int, eventLoopGroup: EventLoopGroup) { + /// - id: The unique identifier for the connection (optional). + public init(host: String, port: Int, id: ID = 1, eventLoopGroup: EventLoopGroup) { self.host = host self.port = port + self.id = id let (stream, continuation) = AsyncStream.makeStream() let bufferAllocator = ByteBufferAllocator() - self.state = .initial( - eventLoopGroup: eventLoopGroup, - bufferAllocator: bufferAllocator, - requestStream: stream, - requestContinuation: continuation - ) + self.closePromise = eventLoopGroup.next().makePromise(of: Void.self) + self.state = .initial(eventLoopGroup: eventLoopGroup, bufferAllocator: bufferAllocator, requestStream: stream, requestContinuation: continuation) + } + + deinit { + // Fulfill the promise if it has not been fulfilled yet + closePromise.fail(MemcacheError(code: .connectionShutdown, + message: "MemcacheConnection deinitialized without closing", + cause: nil, + location: .here())) + } + + /// Closes the connection. This method is responsible for properly shutting down + /// and cleaning up resources associated with the connection. + public func close() { + switch self.state { + case .running(_, let asyncChannel, _, _): + asyncChannel.channel.close().cascade(to: self.closePromise) + default: + self.closePromise.succeed(()) + } + self.state = .finished + } + + /// Registers a closure to be called when the connection is closed. + /// This is useful for performing cleanup or notification tasks. + public func onClose(_ closure: @escaping ((any Error)?) -> Void) { + self.closeFuture.whenComplete { result in + switch result { + case .success: + closure(nil) + case .failure(let error): + closure(error) + } + } } /// Runs the Memcache connection. @@ -95,7 +138,7 @@ public actor MemcacheConnection: Service { return channel.eventLoop.makeCompletedFuture { try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MemcacheRequestEncoder())) try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MemcacheResponseDecoder())) - return try NIOAsyncChannel(synchronouslyWrapping: channel) + return try NIOAsyncChannel(wrappingChannelSynchronously: channel) } }.get() @@ -106,39 +149,41 @@ public actor MemcacheConnection: Service { requestContinuation: continuation ) - var iterator = channel.inboundStream.makeAsyncIterator() switch self.state { case .running(_, let channel, let requestStream, let requestContinuation): - for await (request, continuation) in requestStream { - do { - try await channel.outboundWriter.write(request) - let responseBuffer = try await iterator.next() - - if let response = responseBuffer { - continuation.resume(returning: response) - } else { - self.state = .finished - requestContinuation.finish() - continuation.resume(throwing: MemcacheError( - code: .connectionShutdown, - message: "The connection to the Memcache server was unexpectedly closed.", - cause: nil, - location: .here() - )) - } - } catch { - switch self.state { - case .running: - self.state = .finished - requestContinuation.finish() - continuation.resume(throwing: MemcacheError( - code: .connectionShutdown, - message: "The connection to the Memcache server has shut down while processing a request.", - cause: error, - location: .here() - )) - case .initial, .finished: - break + try await channel.executeThenClose { inbound, outbound in + var inboundIterator = inbound.makeAsyncIterator() + for await (request, continuation) in requestStream { + do { + try await outbound.write(request) + let responseBuffer = try await inboundIterator.next() + + if let response = responseBuffer { + continuation.resume(returning: response) + } else { + self.state = .finished + requestContinuation.finish() + continuation.resume(throwing: MemcacheError( + code: .connectionShutdown, + message: "The connection to the Memcache server was unexpectedly closed.", + cause: nil, + location: .here() + )) + } + } catch { + switch self.state { + case .running: + self.state = .finished + requestContinuation.finish() + continuation.resume(throwing: MemcacheError( + code: .connectionShutdown, + message: "The connection to the Memcache server has shut down while processing a request.", + cause: error, + location: .here() + )) + case .initial, .finished: + break + } } } } diff --git a/Sources/MemcacheExample/Program.swift b/Sources/MemcacheExample/Program.swift index f74c5e4..21c448f 100644 --- a/Sources/MemcacheExample/Program.swift +++ b/Sources/MemcacheExample/Program.swift @@ -19,7 +19,9 @@ import NIOPosix import ServiceLifecycle @main -struct Program { +@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) +@_spi(ConnectionPool) +public struct Program { // Use the shared singleton instance of MultiThreadedEventLoopGroup static let eventLoopGroup = MultiThreadedEventLoopGroup.singleton // Initialize the logger From 0a15156317d4f59e31035b9d4dfcf4246f08db02 Mon Sep 17 00:00:00 2001 From: dkz2 Date: Tue, 12 Dec 2023 21:23:11 -0600 Subject: [PATCH 2/3] conform to pooled connection --- Sources/Memcache/MemcacheConnection.swift | 30 ++++++++++++++--------- Sources/MemcacheExample/Program.swift | 4 +-- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/Sources/Memcache/MemcacheConnection.swift b/Sources/Memcache/MemcacheConnection.swift index 4fe982f..288be32 100644 --- a/Sources/Memcache/MemcacheConnection.swift +++ b/Sources/Memcache/MemcacheConnection.swift @@ -20,8 +20,6 @@ import ServiceLifecycle /// An actor to create a connection to a Memcache server. /// /// This actor can be used to send commands to the server. -@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -@_spi(ConnectionPool) public actor MemcacheConnection: Service, PooledConnection { public typealias ID = Int public let id: ID @@ -95,10 +93,16 @@ public actor MemcacheConnection: Service, PooledConnection { /// Closes the connection. This method is responsible for properly shutting down /// and cleaning up resources associated with the connection. - public func close() { + public nonisolated func close() { + Task { + await self.closeConnection() + } + } + + private func closeConnection() async { switch self.state { - case .running(_, let asyncChannel, _, _): - asyncChannel.channel.close().cascade(to: self.closePromise) + case .running(_, let channel, _, _): + channel.channel.close().cascade(to: self.closePromise) default: self.closePromise.succeed(()) } @@ -107,13 +111,15 @@ public actor MemcacheConnection: Service, PooledConnection { /// Registers a closure to be called when the connection is closed. /// This is useful for performing cleanup or notification tasks. - public func onClose(_ closure: @escaping ((any Error)?) -> Void) { - self.closeFuture.whenComplete { result in - switch result { - case .success: - closure(nil) - case .failure(let error): - closure(error) + public nonisolated func onClose(_ closure: @escaping ((any Error)?) -> Void) { + Task { + await self.closeFuture.whenComplete { result in + switch result { + case .success: + closure(nil) + case .failure(let error): + closure(error) + } } } } diff --git a/Sources/MemcacheExample/Program.swift b/Sources/MemcacheExample/Program.swift index 21c448f..f74c5e4 100644 --- a/Sources/MemcacheExample/Program.swift +++ b/Sources/MemcacheExample/Program.swift @@ -19,9 +19,7 @@ import NIOPosix import ServiceLifecycle @main -@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *) -@_spi(ConnectionPool) -public struct Program { +struct Program { // Use the shared singleton instance of MultiThreadedEventLoopGroup static let eventLoopGroup = MultiThreadedEventLoopGroup.singleton // Initialize the logger From 68ac98df1cfe7d46a47a325ca77ec79f0bd5d065 Mon Sep 17 00:00:00 2001 From: dkz2 Date: Tue, 12 Dec 2023 22:11:33 -0600 Subject: [PATCH 3/3] atomic counter when id is not passed --- Sources/Memcache/MemcacheConnection.swift | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Sources/Memcache/MemcacheConnection.swift b/Sources/Memcache/MemcacheConnection.swift index 288be32..ddc6677 100644 --- a/Sources/Memcache/MemcacheConnection.swift +++ b/Sources/Memcache/MemcacheConnection.swift @@ -13,6 +13,7 @@ //===----------------------------------------------------------------------===// import _ConnectionPoolModule +import Atomics import NIOCore import NIOPosix import ServiceLifecycle @@ -23,6 +24,7 @@ import ServiceLifecycle public actor MemcacheConnection: Service, PooledConnection { public typealias ID = Int public let id: ID + private static var nextID: ManagedAtomic = ManagedAtomic(0) private let closePromise: EventLoopPromise @@ -73,10 +75,10 @@ public actor MemcacheConnection: Service, PooledConnection { /// - port: The port number of the Memcache server. /// - eventLoopGroup: The event loop group to use for this connection. /// - id: The unique identifier for the connection (optional). - public init(host: String, port: Int, id: ID = 1, eventLoopGroup: EventLoopGroup) { + public init(host: String, port: Int, id: ID? = nil, eventLoopGroup: EventLoopGroup) { self.host = host self.port = port - self.id = id + self.id = id ?? MemcacheConnection.nextID.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) let (stream, continuation) = AsyncStream.makeStream() let bufferAllocator = ByteBufferAllocator() self.closePromise = eventLoopGroup.next().makePromise(of: Void.self)