Skip to content

Commit 4ab0373

Browse files
authored
DX-1614: Add flow control parameters (#66)
* feat: add flow control settings * fix: add flowControl to getHeaders * fix: update docstring * fix: bump qstash dependency to include flow control * fix: add test for failureFunction in logs
1 parent d3e5594 commit 4ab0373

17 files changed

+362
-86
lines changed

bun.lockb

0 Bytes
Binary file not shown.

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
},
103103
"dependencies": {
104104
"@ai-sdk/openai": "^1.0.15",
105-
"@upstash/qstash": "^2.7.20",
105+
"@upstash/qstash": "^2.7.22",
106106
"ai": "^4.0.30",
107107
"zod": "^3.24.1"
108108
},

src/client/index.test.ts

+91-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,7 @@ describe("workflow client", () => {
340340
const postCancelLogs = await liveClient.logs({
341341
workflowRunId
342342
})
343-
343+
344344
expect(postCancelLogs.cursor).toBe("")
345345
expect(postCancelLogs.runs.length).toBe(1)
346346
expect(postCancelLogs.runs[0]).toEqual({
@@ -378,5 +378,95 @@ describe("workflow client", () => {
378378
}, {
379379
timeout: 60000
380380
})
381+
382+
test("should include failure logs in case of failure", async () => {
383+
const qstashClient = new QStashClient({
384+
baseUrl: process.env.QSTASH_URL,
385+
token: process.env.QSTASH_TOKEN!,
386+
});
387+
const liveClient = new Client({
388+
baseUrl: process.env.QSTASH_URL,
389+
token: process.env.QSTASH_TOKEN!,
390+
})
391+
392+
const body = "some-body"
393+
const workflowRunId = "wfr_some-workflow-run-id-" + nanoid()
394+
395+
const result = await triggerFirstInvocation({
396+
workflowContext: new WorkflowContext({
397+
qstashClient,
398+
headers: new Headers({}) as Headers,
399+
initialPayload: body,
400+
workflowRunId,
401+
steps: [],
402+
url: "https://httpstat.us/400",
403+
failureUrl: "https://httpstat.us/200",
404+
retries: 0
405+
})
406+
})
407+
expect(result.isOk()).toBe(true)
408+
409+
await eventually(
410+
async () => {
411+
const logs = await liveClient.logs({
412+
workflowRunId
413+
})
414+
415+
expect(logs.cursor).toBe("")
416+
expect(logs.runs.length).toBe(1)
417+
expect(logs.runs[0]).toEqual({
418+
workflowRunId,
419+
workflowUrl: "https://httpstat.us/400",
420+
workflowState: "RUN_FAILED",
421+
workflowRunCreatedAt: expect.any(Number),
422+
workflowRunCompletedAt: expect.any(Number),
423+
failureFunction: {
424+
messageId: expect.any(String),
425+
dlqId: expect.any(String),
426+
failResponse: "400 Bad Request",
427+
failStatus: 400,
428+
url: "https://httpstat.us/400",
429+
state: "DELIVERED",
430+
failHeaders: expect.any(Object),
431+
},
432+
steps: [
433+
{
434+
steps: [
435+
{
436+
callType: "step",
437+
concurrent: 1,
438+
createdAt: expect.any(Number),
439+
headers: {
440+
"Upstash-Workflow-Sdk-Version": [
441+
"1"
442+
],
443+
},
444+
messageId: expect.any(String),
445+
out: "some-body",
446+
state: "STEP_SUCCESS",
447+
stepName: "init",
448+
stepType: "Initial",
449+
},
450+
],
451+
type: "sequential",
452+
},
453+
{
454+
steps: [
455+
{
456+
state: "STEP_FAILED",
457+
messageId: expect.any(String),
458+
},
459+
],
460+
type: "next",
461+
},
462+
],
463+
})
464+
},
465+
{ timeout: 30_000, interval: 100 }
466+
);
467+
468+
}, {
469+
timeout: 60000
470+
})
381471
})
382472
});

src/client/index.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { NotifyResponse, Waiter } from "../types";
2-
import { Client as QStashClient } from "@upstash/qstash";
2+
import { FlowControl, Client as QStashClient } from "@upstash/qstash";
33
import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
44
import { getWorkflowRunId } from "../utils";
55
import { triggerFirstInvocation } from "../workflow-requests";
@@ -198,12 +198,14 @@ export class Client {
198198
headers,
199199
workflowRunId,
200200
retries,
201+
flowControl,
201202
}: {
202203
url: string;
203204
body?: unknown;
204205
headers?: Record<string, string>;
205206
workflowRunId?: string;
206207
retries?: number;
208+
flowControl?: FlowControl
207209
}): Promise<{ workflowRunId: string }> {
208210
const finalWorkflowRunId = getWorkflowRunId(workflowRunId);
209211
const context = new WorkflowContext({
@@ -216,6 +218,7 @@ export class Client {
216218
workflowRunId: finalWorkflowRunId,
217219
retries,
218220
telemetry: undefined, // can't know workflow telemetry here
221+
flowControl,
219222
});
220223
const result = await triggerFirstInvocation({
221224
workflowContext: context,

src/context/auto-executor.ts

+34-31
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ export class AutoExecutor {
5858
if (this.executingStep) {
5959
throw new WorkflowError(
6060
"A step can not be run inside another step." +
61-
` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'`
61+
` Tried to run '${stepInfo.stepName}' inside '${this.executingStep}'`
6262
);
6363
}
6464

@@ -171,7 +171,7 @@ export class AutoExecutor {
171171
// user has added/removed a parallel step
172172
throw new WorkflowError(
173173
`Incompatible number of parallel steps when call state was '${parallelCallState}'.` +
174-
` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.`
174+
` Expected ${parallelSteps.length}, got ${plannedParallelStepCount} from the request.`
175175
);
176176
}
177177

@@ -206,7 +206,7 @@ export class AutoExecutor {
206206
if (!planStep || planStep.targetStep === undefined) {
207207
throw new WorkflowError(
208208
`There must be a last step and it should have targetStep larger than 0.` +
209-
`Received: ${JSON.stringify(planStep)}`
209+
`Received: ${JSON.stringify(planStep)}`
210210
);
211211
}
212212
const stepIndex = planStep.targetStep - initialStepCount;
@@ -350,6 +350,7 @@ export class AutoExecutor {
350350
failureUrl: this.context.failureUrl,
351351
retries: this.context.retries,
352352
telemetry: this.telemetry,
353+
flowControl: this.context.flowControl
353354
});
354355

355356
// call wait
@@ -393,37 +394,39 @@ export class AutoExecutor {
393394
callRetries: lazyStep instanceof LazyCallStep ? lazyStep.retries : undefined,
394395
callTimeout: lazyStep instanceof LazyCallStep ? lazyStep.timeout : undefined,
395396
telemetry: this.telemetry,
397+
flowControl: this.context.flowControl,
398+
callFlowControl: lazyStep instanceof LazyCallStep ? lazyStep.flowControl : undefined,
396399
});
397400

398401
// if the step is a single step execution or a plan step, we can add sleep headers
399402
const willWait = singleStep.concurrent === NO_CONCURRENCY || singleStep.stepId === 0;
400403

401404
singleStep.out = JSON.stringify(singleStep.out);
402405

403-
return singleStep.callUrl
406+
return singleStep.callUrl && lazyStep instanceof LazyCallStep
404407
? // if the step is a third party call, we call the third party
405-
// url (singleStep.callUrl) and pass information about the workflow
406-
// in the headers (handled in getHeaders). QStash makes the request
407-
// to callUrl and returns the result to Workflow endpoint.
408-
// handleThirdPartyCallResult method sends the result of the third
409-
// party call to QStash.
410-
{
411-
headers,
412-
method: singleStep.callMethod,
413-
body: singleStep.callBody,
414-
url: singleStep.callUrl,
415-
}
408+
// url (singleStep.callUrl) and pass information about the workflow
409+
// in the headers (handled in getHeaders). QStash makes the request
410+
// to callUrl and returns the result to Workflow endpoint.
411+
// handleThirdPartyCallResult method sends the result of the third
412+
// party call to QStash.
413+
{
414+
headers,
415+
method: singleStep.callMethod,
416+
body: singleStep.callBody,
417+
url: singleStep.callUrl,
418+
}
416419
: // if the step is not a third party call, we use workflow
417-
// endpoint (context.url) as URL when calling QStash. QStash
418-
// calls us back with the updated steps list.
419-
{
420-
headers,
421-
method: "POST",
422-
body: singleStep,
423-
url: this.context.url,
424-
notBefore: willWait ? singleStep.sleepUntil : undefined,
425-
delay: willWait ? singleStep.sleepFor : undefined,
426-
};
420+
// endpoint (context.url) as URL when calling QStash. QStash
421+
// calls us back with the updated steps list.
422+
{
423+
headers,
424+
method: "POST",
425+
body: singleStep,
426+
url: this.context.url,
427+
notBefore: willWait ? singleStep.sleepUntil : undefined,
428+
delay: willWait ? singleStep.sleepFor : undefined,
429+
};
427430
})
428431
);
429432

@@ -496,14 +499,14 @@ const validateStep = (lazyStep: BaseLazyStep, stepFromRequest: Step): void => {
496499
if (lazyStep.stepName !== stepFromRequest.stepName) {
497500
throw new WorkflowError(
498501
`Incompatible step name. Expected '${lazyStep.stepName}',` +
499-
` got '${stepFromRequest.stepName}' from the request`
502+
` got '${stepFromRequest.stepName}' from the request`
500503
);
501504
}
502505
// check type name
503506
if (lazyStep.stepType !== stepFromRequest.stepType) {
504507
throw new WorkflowError(
505508
`Incompatible step type. Expected '${lazyStep.stepType}',` +
506-
` got '${stepFromRequest.stepType}' from the request`
509+
` got '${stepFromRequest.stepType}' from the request`
507510
);
508511
}
509512
};
@@ -531,10 +534,10 @@ const validateParallelSteps = (lazySteps: BaseLazyStep[], stepsFromRequest: Step
531534
const requestStepTypes = stepsFromRequest.map((step) => step.stepType);
532535
throw new WorkflowError(
533536
`Incompatible steps detected in parallel execution: ${error.message}` +
534-
`\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` +
535-
`\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` +
536-
`\n > Step Names expected: ${JSON.stringify(lazyStepNames)}` +
537-
`\n Step Types expected: ${JSON.stringify(lazyStepTypes)}`
537+
`\n > Step Names from the request: ${JSON.stringify(requestStepNames)}` +
538+
`\n Step Types from the request: ${JSON.stringify(requestStepTypes)}` +
539+
`\n > Step Names expected: ${JSON.stringify(lazyStepNames)}` +
540+
`\n Step Types expected: ${JSON.stringify(lazyStepTypes)}`
538541
);
539542
}
540543
throw error;

src/context/context.ts

+12-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import { WorkflowAbort } from "../error";
2424
import type { Duration } from "../types";
2525
import { WorkflowApi } from "./api";
2626
import { WorkflowAgents } from "../agents";
27+
import { FlowControl } from "@upstash/qstash";
2728

2829
/**
2930
* Upstash Workflow context
@@ -152,6 +153,11 @@ export class WorkflowContext<TInitialPayload = unknown> {
152153
* Number of retries
153154
*/
154155
public readonly retries: number;
156+
/**
157+
* Settings for controlling the number of active requests
158+
* and number of requests per second with the same key.
159+
*/
160+
public readonly flowControl?: FlowControl
155161

156162
constructor({
157163
qstashClient,
@@ -165,6 +171,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
165171
env,
166172
retries,
167173
telemetry,
174+
flowControl
168175
}: {
169176
qstashClient: WorkflowClient;
170177
workflowRunId: string;
@@ -177,6 +184,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
177184
env?: Record<string, string | undefined>;
178185
retries?: number;
179186
telemetry?: Telemetry;
187+
flowControl?: FlowControl
180188
}) {
181189
this.qstashClient = qstashClient;
182190
this.workflowRunId = workflowRunId;
@@ -187,6 +195,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
187195
this.requestPayload = initialPayload;
188196
this.env = env ?? {};
189197
this.retries = retries ?? DEFAULT_RETRIES;
198+
this.flowControl = flowControl
190199

191200
this.executor = new AutoExecutor(this, this.steps, telemetry, debug);
192201
}
@@ -303,7 +312,7 @@ export class WorkflowContext<TInitialPayload = unknown> {
303312
stepName: string,
304313
settings: CallSettings<TBody>
305314
): Promise<CallResponse<TResult>> {
306-
const { url, method = "GET", body, headers = {}, retries = 0, timeout } = settings;
315+
const { url, method = "GET", body, headers = {}, retries = 0, timeout, flowControl } = settings;
307316

308317
const result = await this.addStep(
309318
new LazyCallStep<CallResponse<string> | string>(
@@ -313,7 +322,8 @@ export class WorkflowContext<TInitialPayload = unknown> {
313322
body,
314323
headers,
315324
retries,
316-
timeout
325+
timeout,
326+
flowControl
317327
)
318328
);
319329

src/context/steps.test.ts

+10-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
} from "./steps";
1010
import { nanoid } from "../utils";
1111
import type { NotifyResponse, NotifyStepResponse, Step } from "../types";
12-
import { Client } from "@upstash/qstash";
12+
import { Client, FlowControl } from "@upstash/qstash";
1313
import { MOCK_QSTASH_SERVER_URL, mockQStashServer } from "../test-utils";
1414
import { WorkflowError } from "../error";
1515

@@ -146,11 +146,18 @@ describe("test steps", () => {
146146
const callHeaders = {
147147
"my-header": headerValue,
148148
};
149-
const step = new LazyCallStep(stepName, callUrl, callMethod, callBody, callHeaders, 14, 30);
149+
const flowControl: FlowControl = {
150+
key: "my-key",
151+
parallelism: 3
152+
}
153+
const step = new LazyCallStep(stepName, callUrl, callMethod, callBody, callHeaders, 14, 30, flowControl);
150154

151155
test("should set correct fields", () => {
152156
expect(step.stepName).toBe(stepName);
153157
expect(step.stepType).toBe("Call");
158+
expect(step.flowControl).toEqual(flowControl);
159+
expect(step.retries).toBe(14);
160+
expect(step.timeout).toBe(30);
154161
});
155162
test("should create plan step", () => {
156163
expect(step.getPlanStep(concurrent, targetStep)).toEqual({
@@ -299,7 +306,7 @@ describe("test steps", () => {
299306
});
300307

301308
test("should throw when step name is empty string ", () => {
302-
const throws = () => new LazyFunctionStep("", () => {});
309+
const throws = () => new LazyFunctionStep("", () => { });
303310
expect(throws).toThrow(
304311
new WorkflowError(
305312
"A workflow step name cannot be undefined or an empty string. Please provide a name for your workflow step."

0 commit comments

Comments
 (0)