Skip to content

Commit cb28b8e

Browse files
authored
DX-1691: Add workflow parameter to context.call (#75)
* feat: add waitForResult parameter to invoke * fix: return types * fix: pass workflow to context.call * fix: fmt * fix: flaky invoke test test was checking the result of context.notify in a loop, which resulted in different number of calls. fixed it by waiting for a long while and notifying once. * fix: timeout issues in test * ci: fix wrangler deploy * fix: review
1 parent 02802a6 commit cb28b8e

File tree

5 files changed

+223
-35
lines changed

5 files changed

+223
-35
lines changed

.github/workflows/test.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ jobs:
652652
working-directory: examples/cloudflare-workers-hono
653653

654654
- name: Deploy
655-
run: wrangler publish
655+
run: wrangler deploy
656656
working-directory: examples/cloudflare-workers-hono
657657
env:
658658
CLOUDFLARE_API_TOKEN: ${{secrets.CLOUDFLARE_API_TOKEN}}

examples/ci/app/test-routes/invoke/workflows/[...]/route.ts

+30-14
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { WorkflowContext } from "@upstash/workflow";
22
import { createWorkflow, serveMany } from "@upstash/workflow/nextjs";
33
import { BASE_URL, CI_RANDOM_ID_HEADER, CI_ROUTE_HEADER, TEST_ROUTE_PREFIX } from "app/ci/constants";
4-
import { saveResult } from "app/ci/upstash/redis";
4+
import { fail, saveResult } from "app/ci/upstash/redis";
55
import { expect, nanoid, testServe } from "app/ci/utils";
66
import { z } from "zod";
77

@@ -43,6 +43,17 @@ const workflowOne = createWorkflow(async (context: WorkflowContext<number>) => {
4343
expect(isCanceled, false)
4444
expect(isFailed, false)
4545

46+
const result = await context.call("call workflow", {
47+
workflow: workflowFour,
48+
body: invokePayload,
49+
headers: {
50+
[CI_ROUTE_HEADER]: context.headers.get(CI_ROUTE_HEADER) as string,
51+
[CI_RANDOM_ID_HEADER]: context.headers.get(CI_RANDOM_ID_HEADER) as string,
52+
},
53+
})
54+
55+
expect(typeof result.body.workflowRunId, "string")
56+
4657
const { body: failingBody, isCanceled: failingIsCanceled, isFailed: failingIsFailed } = await context.invoke("invoke failing", {
4758
workflow: workflowThree,
4859
body: invokePayload,
@@ -67,7 +78,7 @@ const workflowOne = createWorkflow(async (context: WorkflowContext<number>) => {
6778
"done invoke"
6879
)
6980
}, {
70-
schema: z.number(),
81+
// schema: z.number(), # TODO add back after fromCallback is removed
7182
})
7283

7384
const workflowTwo = createWorkflow(async (context: WorkflowContext<string>) => {
@@ -116,12 +127,22 @@ const workflowTwo = createWorkflow(async (context: WorkflowContext<string>) => {
116127
})
117128

118129
const workflowThree = createWorkflow(async (context: WorkflowContext<string>) => {
119-
expect(context.requestPayload, invokePayload)
130+
try {
131+
expect(context.requestPayload, invokePayload)
132+
} catch {
133+
fail(context)
134+
}
120135
throw new Error("what")
121136
}, {
122137
retries: 0
123138
})
124139

140+
const workflowFour = createWorkflow(async (context: WorkflowContext<string>) => {
141+
await context.sleep("mock", 1)
142+
}, {
143+
retries: 0
144+
})
145+
125146
/**
126147
* wait for event workflows
127148
*/
@@ -142,7 +163,7 @@ const branchOne = createWorkflow(async (context: WorkflowContext<number>) => {
142163
const invokeCount = context.executor.invokeCount
143164
expect(invokeCount, 2)
144165

145-
const { timeout: isTimeout } = await context.waitForEvent("notified event", notifiedEventId, { timeout: "10s" })
166+
const { timeout: isTimeout } = await context.waitForEvent("notified event", notifiedEventId, { timeout: "20s" })
146167
expect(isTimeout, false)
147168

148169
await context.sleep("check", 1)
@@ -168,15 +189,9 @@ const branchTwo = createWorkflow(async (context: WorkflowContext<number>) => {
168189
const invokeCount = context.executor.invokeCount
169190
expect(invokeCount, 2)
170191

171-
let counter = 0;
172-
while (counter < 10) {
173-
const { notifyResponse } = await context.notify("notified event", notifiedEventId, "data")
174-
counter += 1
175-
await context.sleep("wait", 1)
176-
if (notifyResponse.length) {
177-
break
178-
}
179-
}
192+
await context.sleep("wait", 5)
193+
const { notifyResponse } = await context.notify("notified event", notifiedEventId, "data")
194+
expect(notifyResponse.length > 0, true)
180195

181196
await context.sleep("check", 1)
182197

@@ -188,13 +203,14 @@ export const { POST, GET } = testServe(
188203
workflowOne,
189204
workflowTwo,
190205
workflowThree,
206+
workflowFour,
191207
branchOne,
192208
branchTwo,
193209
}, {
194210
baseUrl: BASE_URL
195211
}),
196212
{
197-
expectedCallCount: 26,
213+
expectedCallCount: 28,
198214
expectedResult: "done invoke",
199215
payload,
200216
headers: {

src/context/context.ts

+45-15
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import type { Duration } from "../types";
2727
import { WorkflowApi } from "./api";
2828
import { WorkflowAgents } from "../agents";
2929
import { FlowControl } from "@upstash/qstash";
30+
import { getNewUrlFromWorkflowId } from "../serve/serve-many";
3031

3132
/**
3233
* Upstash Workflow context
@@ -315,29 +316,58 @@ export class WorkflowContext<TInitialPayload = unknown> {
315316
public async call<TResult = unknown, TBody = unknown>(
316317
stepName: string,
317318
settings: CallSettings<TBody>
318-
): Promise<CallResponse<TResult>> {
319-
const {
320-
url,
321-
method = "GET",
322-
body: requestBody,
323-
headers = {},
324-
retries = 0,
325-
timeout,
326-
flowControl,
327-
} = settings;
319+
): Promise<CallResponse<TResult>>;
320+
public async call<
321+
TResult extends { workflowRunId: string } = { workflowRunId: string },
322+
TBody = unknown,
323+
>(
324+
stepName: string,
325+
settings: LazyInvokeStepParams<TBody, unknown> & Pick<CallSettings, "timeout">
326+
): Promise<CallResponse<TResult>>;
327+
public async call<TResult = unknown, TBody = unknown>(
328+
stepName: string,
329+
settings:
330+
| CallSettings<TBody>
331+
| (LazyInvokeStepParams<TBody, unknown> & Pick<CallSettings, "timeout">)
332+
): Promise<CallResponse<TResult | { workflowRunId: string }>> {
333+
let callStep: LazyCallStep<TResult | { workflowRunId: string }>;
334+
if ("workflow" in settings) {
335+
const url = getNewUrlFromWorkflowId(this.url, settings.workflow.workflowId);
328336

329-
return await this.addStep(
330-
new LazyCallStep<TResult>(
337+
callStep = new LazyCallStep<{ workflowRunId: string }>(
338+
stepName,
339+
url,
340+
"POST",
341+
settings.body,
342+
settings.headers || {},
343+
settings.retries || 0,
344+
settings.timeout,
345+
settings.flowControl ?? settings.workflow.options.flowControl
346+
);
347+
} else {
348+
const {
349+
url,
350+
method = "GET",
351+
body,
352+
headers = {},
353+
retries = 0,
354+
timeout,
355+
flowControl,
356+
} = settings;
357+
358+
callStep = new LazyCallStep<TResult>(
331359
stepName,
332360
url,
333361
method,
334-
requestBody,
362+
body,
335363
headers,
336364
retries,
337365
timeout,
338366
flowControl
339-
)
340-
);
367+
);
368+
}
369+
370+
return await this.addStep(callStep);
341371
}
342372

343373
/**

src/serve/serve-many.test.ts

+139-4
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
import { nanoid } from "../utils";
1212
import { WORKFLOW_INVOKE_COUNT_HEADER } from "../constants";
1313
import { Telemetry } from "../types";
14-
import { invokeWorkflow } from "./serve-many";
14+
import { getNewUrlFromWorkflowId, invokeWorkflow } from "./serve-many";
1515

1616
describe("serveMany", () => {
1717
describe("invokeWorkflow", () => {
@@ -148,14 +148,36 @@ describe("serveMany", () => {
148148

149149
const workflowTwo = createWorkflow(
150150
async (context: WorkflowContext<string>) => {
151-
await context.invoke("invoke step two", {
151+
const result = await context.invoke("invoke step two", {
152152
workflow: workflowOne,
153153
body: 2,
154154
flowControl: {
155155
key: "customFlowControl",
156156
parallelism: 4,
157157
},
158158
});
159+
160+
const _body = result.body;
161+
const _isCanceled = result.isCanceled;
162+
const _isFailed = result.isFailed;
163+
164+
console.log(_body, _isCanceled, _isFailed);
165+
166+
// just checking the type. code won't reach here.
167+
const secondResult = await context.invoke("invoke step two", {
168+
workflow: workflowOne,
169+
body: 2,
170+
flowControl: {
171+
key: "customFlowControl",
172+
parallelism: 4,
173+
},
174+
});
175+
176+
const _secondBody = secondResult.body;
177+
const _secondIsCanceled = secondResult.isCanceled;
178+
const _secondIsFailed = secondResult.isFailed;
179+
180+
console.log(_secondBody, _secondIsCanceled, _secondIsFailed);
159181
},
160182
{
161183
flowControl: {
@@ -166,10 +188,33 @@ describe("serveMany", () => {
166188
}
167189
);
168190

191+
const workflowThree = createWorkflow(
192+
async (context: WorkflowContext<string>) => {
193+
const result = await context.call("call other workflow", {
194+
workflow: workflowOne,
195+
body: 2,
196+
});
197+
198+
const _body = result.body;
199+
const _header = result.header;
200+
const _status = result.status;
201+
202+
console.log(_body, _header, _status);
203+
},
204+
{
205+
flowControl: {
206+
key: "workflowThreeFlowControl",
207+
parallelism: 4,
208+
ratePerSecond: 6,
209+
},
210+
}
211+
);
212+
169213
const { POST: handler } = serveMany(
170214
{
171215
"workflow-one": workflowOne,
172216
"workflow-two": workflowTwo,
217+
"workflow-three": workflowThree,
173218
},
174219
{
175220
qstashClient,
@@ -187,7 +232,8 @@ describe("serveMany", () => {
187232

188233
await mockQStashServer({
189234
execute: async () => {
190-
await handler(request);
235+
const response = await handler(request);
236+
expect(response.status).toBe(200);
191237
},
192238
responseFields: { body: "msgId", status: 200 },
193239
receivesRequest: {
@@ -243,7 +289,8 @@ describe("serveMany", () => {
243289

244290
await mockQStashServer({
245291
execute: async () => {
246-
await handler(request);
292+
const response = await handler(request);
293+
expect(response.status).toBe(200);
247294
},
248295
responseFields: { body: "msgId", status: 200 },
249296
receivesRequest: {
@@ -287,5 +334,93 @@ describe("serveMany", () => {
287334
},
288335
});
289336
});
337+
338+
test("should make context.call request with workflow", async () => {
339+
const request = getRequest(
340+
`${WORKFLOW_ENDPOINT}/workflow-three`,
341+
"wfr_id",
342+
"initial-payload",
343+
[]
344+
);
345+
request.headers.set(WORKFLOW_INVOKE_COUNT_HEADER, "1");
346+
347+
await mockQStashServer({
348+
execute: async () => {
349+
const response = await handler(request);
350+
expect(response.status).toBe(200);
351+
},
352+
responseFields: { body: "msgId", status: 200 },
353+
receivesRequest: {
354+
method: "POST",
355+
url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`,
356+
token,
357+
body: [
358+
{
359+
body: "2",
360+
destination: "https://requestcatcher.com/api/workflow-one",
361+
headers: {
362+
"content-type": "application/json",
363+
"upstash-callback": "https://requestcatcher.com/api/workflow-three",
364+
"upstash-callback-feature-set": "LazyFetch,InitialBody",
365+
"upstash-callback-flow-control-key": "workflowThreeFlowControl",
366+
"upstash-callback-flow-control-value": "parallelism=4, rate=6",
367+
"upstash-flow-control-key": "workflowOneFlowControl",
368+
"upstash-flow-control-value": "parallelism=2, rate=10",
369+
"upstash-callback-forward-upstash-workflow-callback": "true",
370+
"upstash-callback-forward-upstash-workflow-concurrent": "1",
371+
"upstash-callback-forward-upstash-workflow-contenttype": "application/json",
372+
"upstash-callback-forward-upstash-workflow-invoke-count": "1",
373+
"upstash-callback-forward-upstash-workflow-stepid": "1",
374+
"upstash-callback-forward-upstash-workflow-stepname": "call other workflow",
375+
"upstash-callback-forward-upstash-workflow-steptype": "Call",
376+
"upstash-callback-retries": "3",
377+
"upstash-callback-workflow-calltype": "fromCallback",
378+
"upstash-callback-workflow-init": "false",
379+
"upstash-callback-workflow-runid": "wfr_id",
380+
"upstash-callback-workflow-url": "https://requestcatcher.com/api/workflow-three",
381+
"upstash-failure-callback-retries": "3",
382+
"upstash-feature-set": "WF_NoDelete,InitialBody",
383+
"upstash-method": "POST",
384+
"upstash-retries": "0",
385+
"upstash-telemetry-framework": "nextjs",
386+
"upstash-telemetry-runtime": "[email protected]",
387+
"upstash-telemetry-sdk": "@upstash/[email protected]",
388+
"upstash-workflow-calltype": "toCallback",
389+
"upstash-workflow-init": "false",
390+
"upstash-workflow-runid": "wfr_id",
391+
"upstash-workflow-sdk-version": "1",
392+
"upstash-workflow-url": "https://requestcatcher.com/api/workflow-three",
393+
},
394+
},
395+
],
396+
},
397+
});
398+
});
399+
});
400+
401+
describe("getNewUrlFromWorkflowId", () => {
402+
test("should return new url", () => {
403+
const url = "https://requestcatcher.com/api/original_workflow";
404+
const workflowId = "workflowId";
405+
const newUrl = getNewUrlFromWorkflowId(url, workflowId);
406+
407+
expect(newUrl).toBe("https://requestcatcher.com/api/workflowId");
408+
});
409+
410+
test("should ignore query parameters", () => {
411+
const url = "https://requestcatcher.com/api/original_workflow?query=param";
412+
const workflowId = "workflowId";
413+
const newUrl = getNewUrlFromWorkflowId(url, workflowId);
414+
415+
expect(newUrl).toBe("https://requestcatcher.com/api/workflowId");
416+
});
417+
418+
test("shuold ignore hash parameters", () => {
419+
const url = "https://requestcatcher.com/api/original_workflow#hash";
420+
const workflowId = "workflowId";
421+
const newUrl = getNewUrlFromWorkflowId(url, workflowId);
422+
423+
expect(newUrl).toBe("https://requestcatcher.com/api/workflowId");
424+
});
290425
});
291426
});

0 commit comments

Comments
 (0)