Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conform to PooledConnection #42

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0"),
.package(url: "https://github.com/vapor/postgres-nio.git", from: "1.19.0"),
],
targets: [
.target(
@@ -43,6 +44,7 @@ let package = Package(
.product(name: "NIOEmbedded", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
.product(name: "_ConnectionPoolModule", package: "postgres-nio"),
]
),
.testTarget(
135 changes: 93 additions & 42 deletions Sources/Memcache/MemcacheConnection.swift
Original file line number Diff line number Diff line change
@@ -11,16 +11,25 @@
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
@_spi(AsyncChannel)

import _ConnectionPoolModule
import NIOCore
import NIOPosix
import ServiceLifecycle

/// An actor to create a connection to a Memcache server.
///
/// This actor can be used to send commands to the server.
public actor MemcacheConnection: Service {
public actor MemcacheConnection: Service, PooledConnection {
public typealias ID = Int
public let id: ID

private let closePromise: EventLoopPromise<Void>

public var closeFuture: EventLoopFuture<Void> {
return self.closePromise.futureResult
}

private typealias StreamElement = (MemcacheRequest, CheckedContinuation<MemcacheResponse, Error>)
private let host: String
private let port: Int
@@ -56,23 +65,63 @@ public actor MemcacheConnection: Service {

private var state: State

/// Initialize a new MemcacheConnection.
/// Initialize a new MemcacheConnection, with an option to specify an ID.
/// If no ID is provided, a default value is used.
///
/// - Parameters:
/// - host: The host address of the Memcache server.
/// - port: The port number of the Memcache server.
/// - eventLoopGroup: The event loop group to use for this connection.
public init(host: String, port: Int, eventLoopGroup: EventLoopGroup) {
/// - id: The unique identifier for the connection (optional).
public init(host: String, port: Int, id: ID = 1, eventLoopGroup: EventLoopGroup) {
self.host = host
self.port = port
self.id = id
let (stream, continuation) = AsyncStream<StreamElement>.makeStream()
let bufferAllocator = ByteBufferAllocator()
self.state = .initial(
eventLoopGroup: eventLoopGroup,
bufferAllocator: bufferAllocator,
requestStream: stream,
requestContinuation: continuation
)
self.closePromise = eventLoopGroup.next().makePromise(of: Void.self)
self.state = .initial(eventLoopGroup: eventLoopGroup, bufferAllocator: bufferAllocator, requestStream: stream, requestContinuation: continuation)
}

deinit {
// Fulfill the promise if it has not been fulfilled yet
closePromise.fail(MemcacheError(code: .connectionShutdown,
message: "MemcacheConnection deinitialized without closing",
cause: nil,
location: .here()))
}

/// Closes the connection. This method is responsible for properly shutting down
/// and cleaning up resources associated with the connection.
public nonisolated func close() {
Task {
await self.closeConnection()
}
}

private func closeConnection() async {
switch self.state {
case .running(_, let channel, _, _):
channel.channel.close().cascade(to: self.closePromise)
default:
self.closePromise.succeed(())
}
self.state = .finished
}

/// Registers a closure to be called when the connection is closed.
/// This is useful for performing cleanup or notification tasks.
public nonisolated func onClose(_ closure: @escaping ((any Error)?) -> Void) {
Task {
await self.closeFuture.whenComplete { result in
switch result {
case .success:
closure(nil)
case .failure(let error):
closure(error)
}
}
}
}

/// Runs the Memcache connection.
@@ -95,7 +144,7 @@ public actor MemcacheConnection: Service {
return channel.eventLoop.makeCompletedFuture {
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MemcacheRequestEncoder()))
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MemcacheResponseDecoder()))
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(synchronouslyWrapping: channel)
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(wrappingChannelSynchronously: channel)
}
}.get()

@@ -106,39 +155,41 @@ public actor MemcacheConnection: Service {
requestContinuation: continuation
)

var iterator = channel.inboundStream.makeAsyncIterator()
switch self.state {
case .running(_, let channel, let requestStream, let requestContinuation):
for await (request, continuation) in requestStream {
do {
try await channel.outboundWriter.write(request)
let responseBuffer = try await iterator.next()

if let response = responseBuffer {
continuation.resume(returning: response)
} else {
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server was unexpectedly closed.",
cause: nil,
location: .here()
))
}
} catch {
switch self.state {
case .running:
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server has shut down while processing a request.",
cause: error,
location: .here()
))
case .initial, .finished:
break
try await channel.executeThenClose { inbound, outbound in
var inboundIterator = inbound.makeAsyncIterator()
for await (request, continuation) in requestStream {
do {
try await outbound.write(request)
let responseBuffer = try await inboundIterator.next()

if let response = responseBuffer {
continuation.resume(returning: response)
} else {
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server was unexpectedly closed.",
cause: nil,
location: .here()
))
}
} catch {
switch self.state {
case .running:
self.state = .finished
requestContinuation.finish()
continuation.resume(throwing: MemcacheError(
code: .connectionShutdown,
message: "The connection to the Memcache server has shut down while processing a request.",
cause: error,
location: .here()
))
case .initial, .finished:
break
}
}
}
}