Skip to content

Commit 7b90b87

Browse files
committed
enqueueMany() method for batch message operations
1 parent ccf6783 commit 7b90b87

File tree

5 files changed

+139
-57
lines changed

5 files changed

+139
-57
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ Changelog
6868

6969
To be released.
7070

71+
- Added `PostgresMessageQueue.enqueueMany()` method for efficiently enqueuing
72+
multiple messages at once.
73+
7174
- Added some logging using [LogTape] for the sake of debugging. The following
7275
categories are used:
7376

deno.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
},
1010
"imports": {
1111
"@deno/dnt": "jsr:@deno/dnt@^0.41.3",
12-
"@fedify/fedify": "jsr:@fedify/fedify@^1.0.0",
13-
"@logtape/logtape": "jsr:@logtape/logtape@^0.8.0",
12+
"@fedify/fedify": "jsr:@fedify/fedify@1.5.0-dev.732",
13+
"@logtape/logtape": "jsr:@logtape/logtape@^0.9.0",
1414
"@std/assert": "jsr:@std/assert@^0.226.0",
1515
"@std/async": "jsr:@std/async@^1.0.5",
1616
"postgres": "npm:postgres@^3.4.5"

deno.lock

+70-55
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/mq.test.ts

+31
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,37 @@ Deno.test("PostgresMessageQueue", async (t) => {
5656
assertGreater(Date.now() - started, 3_000);
5757
});
5858

59+
await t.step("enqueueMany()", async () => {
60+
while (messages.length > 0) messages.pop();
61+
const batchMessages = [
62+
"First batch message",
63+
{ text: "Second batch message" },
64+
{ text: "Third batch message", priority: "high" },
65+
];
66+
await mq.enqueueMany(batchMessages);
67+
await waitFor(() => messages.length === batchMessages.length, 15_000);
68+
assertEquals(messages, batchMessages);
69+
});
70+
71+
await t.step("enqueueMany() with delay", async () => {
72+
while (messages.length > 0) messages.pop();
73+
started = Date.now();
74+
const delayedBatchMessages = [
75+
"Delayed batch 1",
76+
"Delayed batch 2",
77+
];
78+
await mq.enqueueMany(
79+
delayedBatchMessages,
80+
{ delay: Temporal.Duration.from({ seconds: 2 }) },
81+
);
82+
await waitFor(
83+
() => messages.length === delayedBatchMessages.length,
84+
15_000,
85+
);
86+
assertEquals(messages, delayedBatchMessages);
87+
assertGreater(Date.now() - started, 2_000);
88+
});
89+
5990
controller.abort();
6091
await listening;
6192
await listening2;

src/mq.ts

+33
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,39 @@ export class PostgresMessageQueue implements MessageQueue {
111111
});
112112
}
113113

114+
async enqueueMany(
115+
// deno-lint-ignore no-explicit-any
116+
messages: any[],
117+
options?: MessageQueueEnqueueOptions,
118+
): Promise<void> {
119+
if (messages.length === 0) return;
120+
await this.initialize();
121+
const delay = options?.delay ?? Temporal.Duration.from({ seconds: 0 });
122+
if (options?.delay) {
123+
logger.debug("Enqueuing messages with a delay of {delay}...", {
124+
delay,
125+
messages,
126+
});
127+
} else {
128+
logger.debug("Enqueuing messages...", { messages });
129+
}
130+
for (const message of messages) {
131+
await this.#sql`
132+
INSERT INTO ${this.#sql(this.#tableName)} (message, delay)
133+
VALUES (
134+
${this.#json(message)},
135+
${delay.toString()}
136+
);
137+
`;
138+
}
139+
logger.debug("Enqueued messages.", { messages });
140+
await this.#sql.notify(this.#channelName, delay.toString());
141+
logger.debug("Notified the message queue channel {channelName}.", {
142+
channelName: this.#channelName,
143+
messages,
144+
});
145+
}
146+
114147
async listen(
115148
// deno-lint-ignore no-explicit-any
116149
handler: (message: any) => void | Promise<void>,

0 commit comments

Comments
 (0)