Skip to content

Commit 366d593

Browse files
committed
add local in memory cache
1 parent f5303c1 commit 366d593

File tree

2 files changed

+120
-1
lines changed

2 files changed

+120
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import type { Queue } from "@opennextjs/aws/types/overrides";
2+
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
3+
4+
import queueCache from "./queue-cache";
5+
6+
const mockedQueue = {
7+
name: "mocked-queue",
8+
send: vi.fn(),
9+
} satisfies Queue;
10+
11+
const generateMessage = () => ({
12+
MessageGroupId: "test",
13+
MessageBody: {
14+
eTag: "test",
15+
url: "test",
16+
host: "test",
17+
lastModified: Date.now(),
18+
},
19+
MessageDeduplicationId: "test",
20+
});
21+
22+
const mockedPut = vi.fn();
23+
const mockedMatch = vi.fn().mockReturnValue(null);
24+
25+
describe("queue-cache", () => {
26+
beforeEach(() => {
27+
// @ts-ignore
28+
globalThis.caches = {
29+
open: vi.fn().mockReturnValue({
30+
put: mockedPut,
31+
match: mockedMatch,
32+
}),
33+
};
34+
});
35+
36+
afterEach(() => {
37+
vi.resetAllMocks();
38+
});
39+
test("should send the message to the original queue", async () => {
40+
const msg = generateMessage();
41+
const queue = queueCache(mockedQueue, {});
42+
expect(queue.name).toBe("cached-mocked-queue");
43+
await queue.send(msg);
44+
expect(mockedQueue.send).toHaveBeenCalledWith(msg);
45+
});
46+
47+
test("should use the local cache", async () => {
48+
const msg = generateMessage();
49+
const queue = queueCache(mockedQueue, {});
50+
await queue.send(msg);
51+
52+
expect(queue.localCache.size).toBe(1);
53+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
54+
expect(mockedPut).toHaveBeenCalled();
55+
56+
const spiedHas = vi.spyOn(queue.localCache, "has");
57+
await queue.send(msg);
58+
expect(spiedHas).toHaveBeenCalled();
59+
60+
expect(mockedQueue.send).toHaveBeenCalledTimes(1);
61+
62+
expect(mockedMatch).toHaveBeenCalledTimes(1);
63+
});
64+
65+
test("should clear the local cache after 5s", async () => {
66+
vi.useFakeTimers();
67+
const msg = generateMessage();
68+
const queue = queueCache(mockedQueue, {});
69+
await queue.send(msg);
70+
expect(queue.localCache.size).toBe(1);
71+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
72+
73+
vi.advanceTimersByTime(5001);
74+
const alteredMsg = generateMessage();
75+
alteredMsg.MessageGroupId = "test2";
76+
await queue.send(alteredMsg);
77+
expect(queue.localCache.size).toBe(1);
78+
console.log(queue.localCache);
79+
expect(queue.localCache.has(`queue/test2/test`)).toBe(true);
80+
expect(queue.localCache.has(`queue/test/test`)).toBe(false);
81+
vi.useRealTimers();
82+
});
83+
84+
test("should use the regional cache if not in local cache", async () => {
85+
const msg = generateMessage();
86+
const queue = queueCache(mockedQueue, {});
87+
await queue.send(msg);
88+
89+
expect(mockedMatch).toHaveBeenCalledTimes(1);
90+
expect(mockedPut).toHaveBeenCalledTimes(1);
91+
expect(queue.localCache.size).toBe(1);
92+
expect(queue.localCache.has(`queue/test/test`)).toBe(true);
93+
// We need to delete the local cache to test the regional cache
94+
queue.localCache.delete(`queue/test/test`);
95+
96+
const spiedHas = vi.spyOn(queue.localCache, "has");
97+
await queue.send(msg);
98+
expect(spiedHas).toHaveBeenCalled();
99+
expect(mockedMatch).toHaveBeenCalledTimes(2);
100+
});
101+
102+
test("should return early if the message is in the regional cache", async () => {
103+
const msg = generateMessage();
104+
const queue = queueCache(mockedQueue, {});
105+
106+
mockedMatch.mockReturnValueOnce(new Response(null, { status: 200 }));
107+
108+
const spiedSend = mockedQueue.send;
109+
await queue.send(msg);
110+
expect(spiedSend).not.toHaveBeenCalled();
111+
});
112+
});

packages/cloudflare/src/api/queue-cache.ts

+8-1
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ class QueueCache implements Queue {
5151
}
5252
} catch (e) {
5353
error("Error sending message to queue", e);
54+
} finally {
55+
this.clearLocalCache();
5456
}
5557
}
5658

@@ -81,7 +83,12 @@ class QueueCache implements Queue {
8183

8284
private async isInCache(msg: QueueMessage) {
8385
if (this.localCache.has(this.getCacheUrlString(msg))) {
84-
return true;
86+
const insertedAt = this.localCache.get(this.getCacheUrlString(msg))!;
87+
if (Date.now() - insertedAt < this.regionalCacheTtlSec * 1000) {
88+
return true;
89+
}
90+
this.localCache.delete(this.getCacheUrlString(msg));
91+
return false;
8592
}
8693
const cacheKey = this.getCacheKey(msg);
8794
const cache = await this.getCache();

0 commit comments

Comments
 (0)