Skip to content

Commit 708ffb4

Browse files
DX-1619: Workflow Logs API and failure headers fix (#65)
* feat: add client.logs * fix: add failure headers for logs * fix: add test routes * fix: add docstrings and count param * fix: rm example route changes * fix: tests * fix: add tests for logs and add more filters * fix: update docstring * fix: use eventual test logic with periodic check instead of fixed sleeping --------- Co-authored-by: Mehmet Tokgöz <[email protected]>
1 parent 4987834 commit 708ffb4

File tree

9 files changed

+538
-19
lines changed

9 files changed

+538
-19
lines changed

src/client/index.test.ts

Lines changed: 156 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { describe, test, expect } from "bun:test";
2-
import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT } from "../test-utils";
2+
import { MOCK_QSTASH_SERVER_URL, mockQStashServer, WORKFLOW_ENDPOINT, eventually } from "../test-utils";
33
import { Client } from ".";
4+
import { Client as QStashClient } from "@upstash/qstash"
45
import { getWorkflowRunId, nanoid } from "../utils";
6+
import { triggerFirstInvocation } from "../workflow-requests";
7+
import { WorkflowContext } from "../context";
58

69
describe("workflow client", () => {
710
const token = nanoid();
@@ -224,4 +227,156 @@ describe("workflow client", () => {
224227
},
225228
});
226229
});
230+
describe("logs", () => {
231+
232+
test("should send logs request", async () => {
233+
const count = 10;
234+
const cursor = "cursor";
235+
const state = "RUN_FAILED";
236+
const workflowCreatedAt = 123;
237+
const workflowRunId = "wfr-123";
238+
const workflowUrl = "https://workflow-url.com";
239+
240+
await mockQStashServer({
241+
execute: async () => {
242+
await client.logs({
243+
count,
244+
cursor,
245+
state,
246+
workflowCreatedAt,
247+
workflowRunId,
248+
workflowUrl,
249+
});
250+
},
251+
responseFields: {
252+
status: 200,
253+
body: "msgId",
254+
},
255+
receivesRequest: {
256+
method: "GET",
257+
url: `${MOCK_QSTASH_SERVER_URL}/v2/workflows/events?groupBy=workflowRunId` +
258+
`&workflowRunId=${workflowRunId}` +
259+
`&cursor=${cursor}` +
260+
`&count=${count}` +
261+
`&state=${state}` +
262+
`&workflowUrl=${encodeURIComponent(workflowUrl)}` +
263+
`&workflowCreatedAt=${workflowCreatedAt}`,
264+
token,
265+
body: "",
266+
},
267+
});
268+
})
269+
270+
// skipping the live test because it takes too long and is still flaky
271+
test("should get logs - live", async () => {
272+
const qstashClient = new QStashClient({
273+
baseUrl: process.env.QSTASH_URL,
274+
token: process.env.QSTASH_TOKEN!,
275+
});
276+
const liveClient = new Client({
277+
baseUrl: process.env.QSTASH_URL,
278+
token: process.env.QSTASH_TOKEN!,
279+
})
280+
281+
const body = "some-body"
282+
const workflowRunId = "wfr_some-workflow-run-id-" + nanoid()
283+
284+
const result = await triggerFirstInvocation({
285+
workflowContext: new WorkflowContext({
286+
qstashClient,
287+
headers: new Headers({}) as Headers,
288+
initialPayload: body,
289+
workflowRunId,
290+
steps: [],
291+
url: "https://httpstat.us/200",
292+
})
293+
})
294+
expect(result.isOk()).toBe(true)
295+
296+
await eventually(
297+
async () => {
298+
const logs = await liveClient.logs({
299+
workflowRunId
300+
})
301+
302+
expect(logs.cursor).toBe("")
303+
expect(logs.runs.length).toBe(1)
304+
expect(logs.runs[0]).toEqual({
305+
workflowRunId,
306+
workflowUrl: "https://httpstat.us/200",
307+
workflowState: "RUN_STARTED",
308+
workflowRunCreatedAt: expect.any(Number),
309+
steps: [
310+
{
311+
steps: [
312+
{
313+
callType: "step",
314+
concurrent: 1,
315+
createdAt: expect.any(Number),
316+
headers: {
317+
"Upstash-Workflow-Sdk-Version": [
318+
"1"
319+
],
320+
},
321+
messageId: expect.any(String),
322+
out: "some-body",
323+
state: "STEP_SUCCESS",
324+
stepName: "init",
325+
stepType: "Initial",
326+
}
327+
],
328+
type: "sequential",
329+
}
330+
],
331+
})
332+
},
333+
{ timeout: 30_000, interval: 100 }
334+
);
335+
336+
await liveClient.cancel({ ids: workflowRunId })
337+
338+
await eventually(
339+
async () => {
340+
const postCancelLogs = await liveClient.logs({
341+
workflowRunId
342+
})
343+
344+
expect(postCancelLogs.cursor).toBe("")
345+
expect(postCancelLogs.runs.length).toBe(1)
346+
expect(postCancelLogs.runs[0]).toEqual({
347+
workflowRunId,
348+
workflowUrl: "https://httpstat.us/200",
349+
workflowState: "RUN_CANCELED",
350+
workflowRunCreatedAt: expect.any(Number),
351+
workflowRunCompletedAt: expect.any(Number),
352+
steps: [
353+
{
354+
steps: [
355+
{
356+
callType: "step",
357+
concurrent: 1,
358+
createdAt: expect.any(Number),
359+
headers: {
360+
"Upstash-Workflow-Sdk-Version": [
361+
"1"
362+
],
363+
},
364+
messageId: expect.any(String),
365+
out: "some-body",
366+
state: "STEP_SUCCESS",
367+
stepName: "init",
368+
stepType: "Initial",
369+
}
370+
],
371+
type: "sequential",
372+
}
373+
],
374+
})
375+
},
376+
{ timeout: 30_000, interval: 100 }
377+
);
378+
}, {
379+
timeout: 60000
380+
})
381+
})
227382
});

src/client/index.ts

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { makeGetWaitersRequest, makeNotifyRequest } from "./utils";
44
import { getWorkflowRunId } from "../utils";
55
import { triggerFirstInvocation } from "../workflow-requests";
66
import { WorkflowContext } from "../context";
7+
import { WorkflowRunLog, WorkflowRunLogs } from "./types";
78

89
type ClientConfig = ConstructorParameters<typeof QStashClient>[0];
910

@@ -226,4 +227,69 @@ export class Client {
226227
throw result.error;
227228
}
228229
}
230+
231+
/**
232+
* Fetches logs for workflow runs.
233+
*
234+
* @param workflowRunId - The ID of the workflow run to fetch logs for.
235+
* @param cursor - The cursor for pagination.
236+
* @param count - Number of runs to fetch. Default value is 10.
237+
* @param state - The state of the workflow run.
238+
* @param workflowUrl - The URL of the workflow. Should be an exact match.
239+
* @param workflowCreatedAt - The creation time of the workflow. If you have two workflow runs with the same URL, you can use this to filter them.
240+
* @returns A promise that resolves to either a `WorkflowRunLog` or a `WorkflowRunResponse`.
241+
*
242+
* @example
243+
* Fetch logs for a specific workflow run:
244+
* ```typescript
245+
* const { runs } = await client.logs({ workflowRunId: '12345' });
246+
* const steps = runs[0].steps; // access steps
247+
* ```
248+
*
249+
* @example
250+
* Fetch logs with pagination:
251+
* ```typescript
252+
* const { runs, cursor } = await client.logs();
253+
* const steps = runs[0].steps // access steps
254+
*
255+
* const { runs: nextRuns, cursor: nextCursor } = await client.logs({ cursor, count: 2 });
256+
* ```
257+
*/
258+
public async logs(params?: {
259+
workflowRunId?: WorkflowRunLog["workflowRunId"];
260+
cursor?: string;
261+
count?: number;
262+
state?: WorkflowRunLog["workflowState"];
263+
workflowUrl?: WorkflowRunLog["workflowUrl"];
264+
workflowCreatedAt?: WorkflowRunLog["workflowRunCreatedAt"];
265+
}): Promise<WorkflowRunLogs> {
266+
267+
const { workflowRunId, cursor, count, state, workflowUrl, workflowCreatedAt } = params ?? {};
268+
269+
const urlParams = new URLSearchParams({ "groupBy": "workflowRunId" });
270+
if (workflowRunId) {
271+
urlParams.append("workflowRunId", workflowRunId);
272+
}
273+
if (cursor) {
274+
urlParams.append("cursor", cursor);
275+
}
276+
if (count) {
277+
urlParams.append("count", count.toString());
278+
}
279+
if (state) {
280+
urlParams.append("state", state);
281+
}
282+
if (workflowUrl) {
283+
urlParams.append("workflowUrl", workflowUrl);
284+
}
285+
if (workflowCreatedAt) {
286+
urlParams.append("workflowCreatedAt", workflowCreatedAt.toString());
287+
}
288+
289+
const result = await this.client.http.request<WorkflowRunLogs>({
290+
path: ["v2", "workflows", `events?${urlParams.toString()}`],
291+
})
292+
293+
return result
294+
}
229295
}

0 commit comments

Comments
 (0)