Skip to content
This repository was archived by the owner on Oct 24, 2024. It is now read-only.

Commit 5830ed9

Browse files
committed
✨ Allow resetting ack when calling ping()
At the moment, when calling: ```js queue.ping({resetTries: true}) ``` The tries are reset as if the job hasn't been picked up, but the `ack` is left as-is. This isn't necessarily problematic in the operation of the queue, but it does mean that the pinged job will still show up as [in-flight][1]. This change adds an optional `resetAck` flag, which will also unset the `ack`, and means that the job can be marked as not in-flight, as if it has never been picked up. [1]: https://github.com/reedsy/mongodb-queue/blob/6133fc9367f4fce719e36d8866841d531e956b6b/mongodb-queue.ts#L262
1 parent 6133fc9 commit 5830ed9

File tree

3 files changed

+33
-1
lines changed

3 files changed

+33
-1
lines changed

Diff for: mongodb-queue.ts

+5
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export type GetOptions = {
3939
export type PingOptions = {
4040
visibility?: number;
4141
resetTries?: boolean;
42+
resetAck?: boolean;
4243
};
4344

4445
export type BaseMessage<T = any> = {
@@ -205,6 +206,10 @@ export class MongoDBQueue<T = any> {
205206
};
206207
}
207208

209+
if (opts.resetAck) {
210+
update.$unset = {ack: 1};
211+
}
212+
208213
const msg = await this.col.findOneAndUpdate(query, update, options);
209214
if (!msg.value) {
210215
throw new Error('Queue.ping(): Unidentified ack : ' + ack);

Diff for: package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@reedsy/mongodb-queue",
3-
"version": "8.1.0",
3+
"version": "8.2.0",
44
"description": "Message queues which uses MongoDB.",
55
"main": "mongodb-queue.js",
66
"scripts": {

Diff for: test/ping.js

+27
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,33 @@ setup().then(({client, db}) => {
109109
t.end();
110110
});
111111

112+
test('ping: reset ack', async function(t) {
113+
const queue = new MongoDBQueue(db, 'ping', {visibility: 3});
114+
let msg;
115+
let id;
116+
117+
id = await queue.add('Hello, World!');
118+
t.ok(id, 'There is an id returned when adding a message.');
119+
msg = await queue.get();
120+
const ack = msg.ack;
121+
// message should reset in three seconds
122+
t.ok(msg.id, 'Got a msg.id (sanity check)');
123+
await timeout(2000);
124+
id = await queue.ping(msg.ack, {resetAck: true});
125+
t.ok(id, 'Received an id when acking this message');
126+
// wait until the msg has returned to the queue
127+
await timeout(6000);
128+
msg = await queue.get();
129+
t.notEqual(ack, msg.ack, 'Ack was reset');
130+
await queue.ack(msg.ack);
131+
msg = await queue.get();
132+
// no more messages
133+
t.ok(!msg, 'No msg received');
134+
135+
t.pass('Finished test ok');
136+
t.end();
137+
});
138+
112139
test('client.close()', function(t) {
113140
t.pass('client.close()');
114141
client.close();

0 commit comments

Comments
 (0)