-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
graceful close #336
Comments
By and large, this library attempts to be a thin wrapper around the underlying amqplib, and it would do the same thing. I'm not sure if graceful close is something I want to include in this library. Are there other libraries out there that will cover that use case? There are all sorts of interesting corner cases which might make a generic approach to this challenging (e.g. RPC use cases where we want to wait until all replies have been collected to shut things down). That said, we do already keep track of all the messages that are queued-but-not-yet-sent and sent-but-not-acked, so I'd happily accept a PR that exposes these counts outside of the ChannelWrapper. On several occasions I've also tried to write some kind of generic interface for a "message store" so that instead of storing unsent messages in RAM you could store them in Postgres or Redis or something similar. This is far more complicated than it first seems (if we have a queued message, and the system restarts, we could reload all these messages from Redis, but then whomever sent these messages originally would no longer have a |
At first I thought about using a
The "thickest" part of this wrapper lib is that it maintains its own message buffers, though. I don't see how another library could cover this, particularly because Otherwise, I can hack it in with something like this. It's important to note, however, that import { ChannelWrapper } from 'amqp-connection-manager'
export const withGracefulClose = (channel: ChannelWrapper): ChannelWrapper => {
const pending = new Set<Promise<boolean>>()
const wrapMethod = <T extends (...args: any[]) => Promise<boolean>>(method: T) => {
const original = method.bind(channel)
return (...args: Parameters<T>) => {
const promise = original(...args)
pending.add(promise)
promise.finally(() => pending.delete(promise))
return promise
}
}
channel.publish = wrapMethod(channel.publish)
channel.sendToQueue = wrapMethod(channel.sendToQueue)
const originalClose = channel.close.bind(channel)
channel.close = async () => {
await Promise.allSettled([...pending])
return originalClose()
}
return channel
} As far as the message store, one of our apps actually does exactly that because we needed the ability to schedule messages for the future. We have an API endpoint where senders post the message with a Multi-tenancy is another big can of worms. We only have one process polling the db and sending messages to Rabbit. In our particular use-case, that's good enough, but if you needed more exact timing, you would need multiple processes (nodes) doing this in parallel, so the store would need a concept of synchronization. E.g. a postgres transaction that both retrieves the record and marks it "checked out", and some other mechanism to handle a worker dying while a record is checked out... It could easily end up with too many architectural decisions to handle in a library like this, as you would need to implement such synchronization for any type of store you wanted to support. |
One other quick note...
|
@ehaynes99 Have you come up with some solution regarding this? What did you end up going with? |
Currently, when a channel is closed, the promises for all pending messages are immediately rejected. When spinning down nodes in a cluster, I don't really want to just pull the rug out from under it, but rather let it finish what it's doing. As far as I can tell, there's no exposed mechanism to wait for sends to complete at the
ChannelWrapper
level. CheckingqueueLength
is insufficient, because the in-flight messages don't count towards that total.I have various things pushing messages, so to track them all externally, I'll have to make a
ChannelWrapperWrapper
that keeps track of all of the pending promises in a single place, which seems a bit clumsy. Is there a simpler way to do this?The text was updated successfully, but these errors were encountered: