Skip to content

Commit 27591c1

Browse files
committed
Add priority-queue example for TS
1 parent d36fec5 commit 27591c1

File tree

7 files changed

+231
-0
lines changed

7 files changed

+231
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Priority queue
2+
3+
An example of implementing your own priority queue using Restate state and
4+
awakeables.
5+
6+
Run the example with `npm run app-dev`.
7+
8+
You can simulate adding work to the queue like this:
9+
```shell
10+
# add a single entry
11+
curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2}'
12+
# add lots
13+
for i in $(seq 1 30); do curl localhost:8080/myService/expensiveMethod/send --json '{"left": 1, "right": 2}'; done
14+
```
15+
16+
As you do so, you can observe the logs; in flight requests will increase up to 10, beyond which items will be enqueued.
17+
18+
You can write your own queue item selection logic in `selectAndPopItem`; doing so is outside the scope of this example.
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
{
2+
"name": "@restatedev/example-pattern-priority-queue",
3+
"version": "0.1.0",
4+
"description": "A Restate example showing the implementation of a distributed priority queue",
5+
"type": "commonjs",
6+
"scripts": {
7+
"build": "tsc --noEmitOnError",
8+
"app-dev": "tsx --watch ./src/app.ts",
9+
"app": "tsx ./src/app.ts"
10+
},
11+
"dependencies": {
12+
"@restatedev/restate-sdk": "^1.2.1"
13+
},
14+
"devDependencies": {
15+
"@types/node": "^20.12.7",
16+
"tsx": "^4.17.0",
17+
"typescript": "^5.0.2"
18+
}
19+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
3+
*
4+
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
5+
* which is released under the MIT license.
6+
*
7+
* You can find a copy of the license in the file LICENSE
8+
* in the root directory of this repository or package or at
9+
* https://github.com/restatedev/examples/blob/main/LICENSE
10+
*/
11+
12+
import { endpoint } from "@restatedev/restate-sdk";
13+
14+
import { queue } from "./queue";
15+
import { myService } from "./service";
16+
17+
endpoint().bind(queue).bind(myService).listen();
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
3+
*
4+
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
5+
* which is released under the MIT license.
6+
*
7+
* You can find a copy of the license in the file LICENSE
8+
* in the root directory of this repository or package or at
9+
* https://github.com/restatedev/examples/blob/main/LICENSE
10+
*/
11+
12+
import { object, ObjectContext, TerminalError } from "@restatedev/restate-sdk";
13+
14+
type QueueState = {
15+
items: QueueItem[];
16+
inFlight: number;
17+
};
18+
19+
type QueueItem = {
20+
awakeable: string;
21+
priority: number;
22+
};
23+
24+
type TickCause =
25+
| {
26+
type: "done";
27+
}
28+
| { type: "push"; item: QueueItem };
29+
30+
// Put your super clever queue fairness algorithm here
31+
function selectAndPopItem<T>(items: QueueItem[]): QueueItem {
32+
return items.pop()!;
33+
}
34+
35+
const MAX_IN_FLIGHT = 10;
36+
37+
export const queue = object({
38+
name: "queue",
39+
handlers: {
40+
tick: async (
41+
ctx: ObjectContext<QueueState>,
42+
cause: TickCause,
43+
): Promise<void> => {
44+
let items = (await ctx.get("items")) ?? [];
45+
let inFlight = (await ctx.get("inFlight")) ?? 0;
46+
switch (cause?.type) {
47+
case "done":
48+
inFlight--;
49+
break;
50+
case "push":
51+
items.push(cause.item);
52+
break;
53+
default:
54+
throw new TerminalError(`unexpected queue tick cause ${cause}`);
55+
}
56+
57+
while (inFlight < MAX_IN_FLIGHT && items.length > 0) {
58+
let item = selectAndPopItem(items);
59+
inFlight++;
60+
ctx.resolveAwakeable(item.awakeable);
61+
}
62+
63+
ctx.console.log(
64+
`Tick end. Queue length: ${items.length}, In Flight: ${inFlight}`,
65+
);
66+
67+
ctx.set("items", items);
68+
ctx.set("inFlight", inFlight);
69+
},
70+
},
71+
});
72+
73+
export type Queue = typeof queue;
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
3+
*
4+
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
5+
* which is released under the MIT license.
6+
*
7+
* You can find a copy of the license in the file LICENSE
8+
* in the root directory of this repository or package or at
9+
* https://github.com/restatedev/examples/blob/main/LICENSE
10+
*/
11+
12+
import { Context, TerminalError } from "@restatedev/restate-sdk";
13+
import type { Queue } from "./queue";
14+
15+
export async function doWithQueue<T>(
16+
ctx: Context,
17+
priority: number,
18+
operation: () => Promise<T>,
19+
): Promise<T> {
20+
const awakeable = ctx.awakeable();
21+
ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
22+
type: "push",
23+
item: {
24+
awakeable: awakeable.id,
25+
priority,
26+
},
27+
});
28+
29+
await awakeable.promise;
30+
31+
try {
32+
const result = await operation();
33+
34+
ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
35+
type: "done",
36+
});
37+
38+
return result;
39+
} catch (e) {
40+
if (e instanceof TerminalError) {
41+
ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
42+
type: "done",
43+
});
44+
}
45+
throw e;
46+
}
47+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
3+
*
4+
* This file is part of the Restate Examples for the Node.js/TypeScript SDK,
5+
* which is released under the MIT license.
6+
*
7+
* You can find a copy of the license in the file LICENSE
8+
* in the root directory of this repository or package or at
9+
* https://github.com/restatedev/examples/blob/main/LICENSE
10+
*/
11+
12+
import { Context, service } from "@restatedev/restate-sdk";
13+
import { doWithQueue } from "./queue_client";
14+
15+
export const myService = service({
16+
name: "myService",
17+
handlers: {
18+
expensiveMethod: async (
19+
ctx: Context,
20+
params: { left: number; right: number },
21+
): Promise<number> => {
22+
return doWithQueue(ctx, 1, () =>
23+
ctx.run(() => expensiveOperation(params.left, params.right)),
24+
);
25+
},
26+
},
27+
});
28+
29+
async function expensiveOperation(
30+
left: number,
31+
right: number,
32+
): Promise<number> {
33+
await new Promise((resolve) => setTimeout(resolve, 5_000));
34+
35+
// very cpu heavy - important that the queue protects this
36+
return left + right;
37+
}
38+
39+
export type MyService = typeof myService;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"compilerOptions": {
3+
"target": "esnext",
4+
"lib": ["esnext"],
5+
"module": "nodenext",
6+
"allowJs": true,
7+
"declaration": true,
8+
"declarationMap": true,
9+
"sourceMap": true,
10+
"outDir": "./dist",
11+
"allowSyntheticDefaultImports": true,
12+
"esModuleInterop": true,
13+
"forceConsistentCasingInFileNames": true,
14+
"strict": true,
15+
"skipDefaultLibCheck": true,
16+
"skipLibCheck": true
17+
}
18+
}

0 commit comments

Comments
 (0)