Skip to content

Commit 6ce5643

Browse files
authored
Feat Sharding Replication (#484)
* split soft and hard tags * double sharding * simple handling of errors on write * add dead letter queue * refactor and lint fix * changeset * review fix * rename options * change do name * fix replica options name * review fix * new class for DOId * fix app router * refactored class * review fix * change comment
1 parent 02f2c7a commit 6ce5643

File tree

8 files changed

+406
-90
lines changed

8 files changed

+406
-90
lines changed

.changeset/modern-laws-happen.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@opennextjs/cloudflare": patch
3+
---
4+
5+
Add sharding replication for the Durable Object Tag Cache

examples/e2e/app-router/open-next.config.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,14 @@ import doQueue from "@opennextjs/cloudflare/durable-queue";
55

66
export default defineCloudflareConfig({
77
incrementalCache: kvIncrementalCache,
8-
tagCache: shardedTagCache({ numberOfShards: 12 }),
8+
// With such a configuration, we could have up to 12 * (8 + 2) = 120 Durable Objects instances
9+
tagCache: shardedTagCache({
10+
numberOfShards: 12,
11+
enableShardReplication: true,
12+
shardReplicationOptions: {
13+
numberOfSoftReplicas: 8,
14+
numberOfHardReplicas: 2,
15+
},
16+
}),
917
queue: doQueue,
1018
});

examples/e2e/app-router/package.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
"start": "next start --port 3001",
1010
"lint": "next lint",
1111
"clean": "rm -rf .turbo node_modules .next .open-next",
12-
"d1:clean": "wrangler d1 execute NEXT_CACHE_D1 --command \"DROP TABLE IF EXISTS tags; DROP TABLE IF EXISTS revalidations\"",
13-
"build:worker": "pnpm d1:clean && pnpm opennextjs-cloudflare build",
12+
"build:worker": "pnpm opennextjs-cloudflare build",
1413
"preview:worker": "pnpm opennextjs-cloudflare preview",
1514
"preview": "pnpm build:worker && pnpm preview:worker",
1615
"e2e": "playwright test -c e2e/playwright.config.ts"

examples/e2e/app-router/wrangler.jsonc

+1-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"class_name": "DurableObjectQueueHandler"
1616
},
1717
{
18-
"name": "NEXT_CACHE_D1_SHARDED",
18+
"name": "NEXT_CACHE_DO_SHARDED",
1919
"class_name": "DOShardedTagCache"
2020
}
2121
]
@@ -32,13 +32,6 @@
3232
"id": "<BINDING_ID>"
3333
}
3434
],
35-
"d1_databases": [
36-
{
37-
"binding": "NEXT_CACHE_D1",
38-
"database_id": "NEXT_CACHE_D1",
39-
"database_name": "NEXT_CACHE_D1"
40-
}
41-
],
4235
"services": [
4336
{
4437
"binding": "NEXT_CACHE_REVALIDATION_WORKER",

packages/cloudflare/src/api/cloudflare-context.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ declare global {
2222
// Durable Object namespace to use for the durable object queue handler
2323
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
2424
// Durables object namespace to use for the sharded tag cache
25-
NEXT_CACHE_D1_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
25+
NEXT_CACHE_DO_SHARDED?: DurableObjectNamespace<DOShardedTagCache>;
26+
// Queue of failed tag write
27+
// It could be used for monitoring or to reprocess failed writes
28+
// Entirely optional
29+
NEXT_CACHE_DO_SHARDED_DLQ?: Queue;
2630

2731
// Asset binding
2832
ASSETS?: Fetcher;

packages/cloudflare/src/api/do-sharded-tag-cache.spec.ts

+183-21
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
22

3-
import doShardedTagCache from "./do-sharded-tag-cache";
3+
import doShardedTagCache, {
4+
DEFAULT_HARD_REPLICAS,
5+
DEFAULT_SOFT_REPLICAS,
6+
TagCacheDOId,
7+
} from "./do-sharded-tag-cache";
48

59
const hasBeenRevalidatedMock = vi.fn();
610
const writeTagsMock = vi.fn();
@@ -9,9 +13,15 @@ const getMock = vi
913
.fn()
1014
.mockReturnValue({ hasBeenRevalidated: hasBeenRevalidatedMock, writeTags: writeTagsMock });
1115
const waitUntilMock = vi.fn().mockImplementation(async (fn) => fn());
16+
const sendDLQMock = vi.fn();
1217
vi.mock("./cloudflare-context", () => ({
1318
getCloudflareContext: () => ({
14-
env: { NEXT_CACHE_D1_SHARDED: { idFromName: idFromNameMock, get: getMock } },
19+
env: {
20+
NEXT_CACHE_DO_SHARDED: { idFromName: idFromNameMock, get: getMock },
21+
NEXT_CACHE_DO_SHARDED_DLQ: {
22+
send: sendDLQMock,
23+
},
24+
},
1525
ctx: { waitUntil: waitUntilMock },
1626
}),
1727
}));
@@ -22,24 +32,81 @@ describe("DOShardedTagCache", () => {
2232
describe("generateShardId", () => {
2333
it("should generate a shardId", () => {
2434
const cache = doShardedTagCache();
25-
const expectedResult = new Map();
26-
expectedResult.set("shard-1", ["tag1"]);
27-
expectedResult.set("shard-2", ["tag2"]);
28-
expect(cache.generateShards(["tag1", "tag2"])).toEqual(expectedResult);
35+
const expectedResult = [
36+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
37+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-2" }), tags: ["tag2"] },
38+
];
39+
const result = cache.groupTagsByDO({ tags: ["tag1", "tag2"] });
40+
expect(result).toEqual(expectedResult);
41+
expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1");
42+
expect(result[1]?.doId.key).toBe("tag-hard;shard-2;replica-1");
2943
});
3044

3145
it("should group tags by shard", () => {
3246
const cache = doShardedTagCache();
33-
const expectedResult = new Map();
34-
expectedResult.set("shard-1", ["tag1", "tag6"]);
35-
expect(cache.generateShards(["tag1", "tag6"])).toEqual(expectedResult);
47+
const expectedResult = [
48+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1", "tag6"] },
49+
];
50+
const result = cache.groupTagsByDO({ tags: ["tag1", "tag6"] });
51+
expect(result).toEqual(expectedResult);
52+
expect(result[0]?.doId.key).toBe("tag-hard;shard-1;replica-1");
3653
});
3754

3855
it("should generate the same shardId for the same tag", () => {
3956
const cache = doShardedTagCache();
40-
const firstResult = cache.generateShards(["tag1"]);
41-
const secondResult = cache.generateShards(["tag1", "tag3", "tag4"]);
42-
expect(firstResult.get("shard-1")).toEqual(secondResult.get("shard-1"));
57+
const firstResult = cache.groupTagsByDO({ tags: ["tag1"] });
58+
const secondResult = cache.groupTagsByDO({ tags: ["tag1", "tag3", "tag4"] });
59+
expect(firstResult[0]).toEqual(secondResult[0]);
60+
});
61+
62+
it("should split hard and soft tags", () => {
63+
const cache = doShardedTagCache();
64+
const expectedResult = [
65+
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
66+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1", replicaId: 1 }), tags: ["tag1"] },
67+
];
68+
const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"] });
69+
expect(result).toEqual(expectedResult);
70+
expect(result[1]?.doId.key).toBe("tag-hard;shard-1;replica-1");
71+
expect(result[0]?.doId.key).toBe("tag-soft;shard-3;replica-1");
72+
});
73+
74+
describe("with shard replication", () => {
75+
it("should generate all doIds if generateAllReplicas is true", () => {
76+
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
77+
const expectedResult = [
78+
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
79+
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
80+
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
81+
{ doId: expect.objectContaining({ shardId: "tag-soft;shard-3" }), tags: ["_N_T_/tag1"] },
82+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
83+
{ doId: expect.objectContaining({ shardId: "tag-hard;shard-1" }), tags: ["tag1"] },
84+
];
85+
const result = cache.groupTagsByDO({ tags: ["tag1", "_N_T_/tag1"], generateAllReplicas: true });
86+
console.log(result);
87+
expect(result).toEqual(expectedResult);
88+
});
89+
90+
it("should generate only one doIds by tag type if generateAllReplicas is false", () => {
91+
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
92+
const shardedTagCollection = cache.groupTagsByDO({
93+
tags: ["tag1", "_N_T_/tag1"],
94+
generateAllReplicas: false,
95+
});
96+
expect(shardedTagCollection.length).toBe(2);
97+
const firstDOId = shardedTagCollection[0]?.doId;
98+
const secondDOId = shardedTagCollection[1]?.doId;
99+
100+
expect(firstDOId?.shardId).toBe("tag-soft;shard-3");
101+
expect(secondDOId?.shardId).toBe("tag-hard;shard-1");
102+
103+
// We still need to check if the last part is between the correct boundaries
104+
expect(firstDOId?.replicaId).toBeGreaterThanOrEqual(1);
105+
expect(firstDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_SOFT_REPLICAS);
106+
107+
expect(secondDOId?.replicaId).toBeGreaterThanOrEqual(1);
108+
expect(secondDOId?.replicaId).toBeLessThanOrEqual(DEFAULT_HARD_REPLICAS);
109+
});
43110
});
44111
});
45112

@@ -115,7 +182,7 @@ describe("DOShardedTagCache", () => {
115182
expect(cache.putToRegionalCache).toHaveBeenCalled();
116183
});
117184

118-
it("should call all the shards", async () => {
185+
it("should call all the durable object instance", async () => {
119186
const cache = doShardedTagCache();
120187
cache.getFromRegionalCache = vi.fn();
121188
const result = await cache.hasBeenRevalidated(["tag1", "tag2"], 123456);
@@ -130,6 +197,11 @@ describe("DOShardedTagCache", () => {
130197
globalThis.openNextConfig = {
131198
dangerous: { disableTagCache: false },
132199
};
200+
vi.useFakeTimers();
201+
vi.setSystemTime(1000);
202+
});
203+
afterEach(() => {
204+
vi.useRealTimers();
133205
});
134206
it("should return early if the cache is disabled", async () => {
135207
globalThis.openNextConfig = {
@@ -146,24 +218,37 @@ describe("DOShardedTagCache", () => {
146218
await cache.writeTags(["tag1"]);
147219
expect(idFromNameMock).toHaveBeenCalled();
148220
expect(writeTagsMock).toHaveBeenCalled();
149-
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
221+
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
150222
});
151223

152224
it("should write the tags to the cache for multiple shards", async () => {
153225
const cache = doShardedTagCache();
154226
await cache.writeTags(["tag1", "tag2"]);
155227
expect(idFromNameMock).toHaveBeenCalledTimes(2);
156228
expect(writeTagsMock).toHaveBeenCalledTimes(2);
157-
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"]);
158-
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"]);
229+
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
230+
expect(writeTagsMock).toHaveBeenCalledWith(["tag2"], 1000);
231+
});
232+
233+
it('should write to all the replicated shards if "generateAllReplicas" is true', async () => {
234+
const cache = doShardedTagCache({ baseShardSize: 4, enableShardReplication: true });
235+
await cache.writeTags(["tag1", "_N_T_/tag1"]);
236+
expect(idFromNameMock).toHaveBeenCalledTimes(6);
237+
expect(writeTagsMock).toHaveBeenCalledTimes(6);
238+
expect(writeTagsMock).toHaveBeenCalledWith(["tag1"], 1000);
239+
expect(writeTagsMock).toHaveBeenCalledWith(["_N_T_/tag1"], 1000);
159240
});
160241

161242
it("should call deleteRegionalCache", async () => {
162243
const cache = doShardedTagCache();
163244
cache.deleteRegionalCache = vi.fn();
164245
await cache.writeTags(["tag1"]);
165246
expect(cache.deleteRegionalCache).toHaveBeenCalled();
166-
expect(cache.deleteRegionalCache).toHaveBeenCalledWith("shard-1", ["tag1"]);
247+
expect(cache.deleteRegionalCache).toHaveBeenCalledWith(
248+
expect.objectContaining({ key: "tag-hard;shard-1;replica-1" }),
249+
["tag1"]
250+
);
251+
// expect(cache.deleteRegionalCache).toHaveBeenCalledWith("tag-hard;shard-1;replica-1", ["tag1"]);
167252
});
168253
});
169254

@@ -178,7 +263,7 @@ describe("DOShardedTagCache", () => {
178263
globalThis.caches = {
179264
open: vi.fn().mockResolvedValue("cache"),
180265
};
181-
const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true });
266+
const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true });
182267
expect(cache.localCache).toBeUndefined();
183268
expect(await cache.getCacheInstance()).toBe("cache");
184269
expect(cache.localCache).toBe("cache");
@@ -190,7 +275,12 @@ describe("DOShardedTagCache", () => {
190275
describe("getFromRegionalCache", () => {
191276
it("should return undefined if regional cache is disabled", async () => {
192277
const cache = doShardedTagCache();
193-
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBeUndefined();
278+
const doId = new TagCacheDOId({
279+
baseShardId: "shard-1",
280+
numberOfReplicas: 1,
281+
shardType: "hard",
282+
});
283+
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBeUndefined();
194284
});
195285

196286
it("should call .match on the cache", async () => {
@@ -200,10 +290,82 @@ describe("DOShardedTagCache", () => {
200290
match: vi.fn().mockResolvedValue("response"),
201291
}),
202292
};
203-
const cache = doShardedTagCache({ numberOfShards: 4, regionalCache: true });
204-
expect(await cache.getFromRegionalCache("shard-1", ["tag1"])).toBe("response");
293+
const cache = doShardedTagCache({ baseShardSize: 4, regionalCache: true });
294+
const doId = new TagCacheDOId({
295+
baseShardId: "shard-1",
296+
numberOfReplicas: 1,
297+
shardType: "hard",
298+
});
299+
expect(await cache.getFromRegionalCache(doId, ["tag1"])).toBe("response");
205300
// @ts-expect-error - Defined on cloudfare context
206301
globalThis.caches = undefined;
207302
});
208303
});
304+
305+
describe("getCacheKey", () => {
306+
it("should return the cache key without the random part", async () => {
307+
const cache = doShardedTagCache();
308+
const doId1 = new TagCacheDOId({ baseShardId: "shard-0", numberOfReplicas: 1, shardType: "hard" });
309+
const reqKey = await cache.getCacheKey(doId1, ["_N_T_/tag1"]);
310+
expect(reqKey.url).toBe("http://local.cache/shard/tag-hard;shard-0?tags=_N_T_%2Ftag1");
311+
312+
const doId2 = new TagCacheDOId({
313+
baseShardId: "shard-1",
314+
numberOfReplicas: 1,
315+
shardType: "hard",
316+
});
317+
const reqKey2 = await cache.getCacheKey(doId2, ["tag1"]);
318+
expect(reqKey2.url).toBe("http://local.cache/shard/tag-hard;shard-1?tags=tag1");
319+
});
320+
});
321+
322+
describe("performWriteTagsWithRetry", () => {
323+
it("should retry if it fails", async () => {
324+
vi.useFakeTimers();
325+
vi.setSystemTime(1000);
326+
const cache = doShardedTagCache();
327+
writeTagsMock.mockImplementationOnce(() => {
328+
throw new Error("error");
329+
});
330+
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
331+
const doId = new TagCacheDOId({
332+
baseShardId: "shard-1",
333+
numberOfReplicas: 1,
334+
shardType: "hard",
335+
});
336+
await cache.performWriteTagsWithRetry(doId, ["tag1"], Date.now());
337+
expect(writeTagsMock).toHaveBeenCalledTimes(2);
338+
expect(spiedFn).toHaveBeenCalledTimes(2);
339+
expect(spiedFn).toHaveBeenCalledWith(doId, ["tag1"], 1000, 1);
340+
expect(sendDLQMock).not.toHaveBeenCalled();
341+
342+
vi.useRealTimers();
343+
});
344+
345+
it("should stop retrying after 3 times", async () => {
346+
vi.useFakeTimers();
347+
vi.setSystemTime(1000);
348+
const cache = doShardedTagCache();
349+
writeTagsMock.mockImplementationOnce(() => {
350+
throw new Error("error");
351+
});
352+
const spiedFn = vi.spyOn(cache, "performWriteTagsWithRetry");
353+
await cache.performWriteTagsWithRetry(
354+
new TagCacheDOId({ baseShardId: "shard-1", numberOfReplicas: 1, shardType: "hard" }),
355+
["tag1"],
356+
Date.now(),
357+
3
358+
);
359+
expect(writeTagsMock).toHaveBeenCalledTimes(1);
360+
expect(spiedFn).toHaveBeenCalledTimes(1);
361+
362+
expect(sendDLQMock).toHaveBeenCalledWith({
363+
failingShardId: "tag-hard;shard-1;replica-1",
364+
failingTags: ["tag1"],
365+
lastModified: 1000,
366+
});
367+
368+
vi.useRealTimers();
369+
});
370+
});
209371
});

0 commit comments

Comments
 (0)