Skip to content

Commit

Permalink
Named queues
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Aug 23, 2024
1 parent 27591c1 commit c2a3da6
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import type { Queue } from "./queue";

export async function doWithQueue<T>(
ctx: Context,
queue: string,
priority: number,
operation: () => Promise<T>,
): Promise<T> {
const awakeable = ctx.awakeable();
ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
ctx.objectSendClient<Queue>({ name: "queue" }, queue).tick({
type: "push",
item: {
awakeable: awakeable.id,
Expand All @@ -31,14 +32,14 @@ export async function doWithQueue<T>(
try {
const result = await operation();

ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
ctx.objectSendClient<Queue>({ name: "queue" }, queue).tick({
type: "done",
});

return result;
} catch (e) {
if (e instanceof TerminalError) {
ctx.objectSendClient<Queue>({ name: "queue" }, "").tick({
ctx.objectSendClient<Queue>({ name: "queue" }, queue).tick({
type: "done",
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@
import { Context, service } from "@restatedev/restate-sdk";
import { doWithQueue } from "./queue_client";

const QUEUE_NAME = "myService/expensiveMethod";

export const myService = service({
name: "myService",
handlers: {
expensiveMethod: async (
ctx: Context,
params: { left: number; right: number },
): Promise<number> => {
return doWithQueue(ctx, 1, () =>
return doWithQueue(ctx, QUEUE_NAME, 1, () =>
ctx.run(() => expensiveOperation(params.left, params.right)),
);
},
Expand Down

0 comments on commit c2a3da6

Please sign in to comment.