Skip to content

Commit 84f9911

Browse files
authored
feat(workflows-sdk): Allow when then in parallelize (medusajs#11756)
**What** Update typings to allow using when then inside parallelize
1 parent 3b470f4 commit 84f9911

File tree

5 files changed

+92
-16
lines changed

5 files changed

+92
-16
lines changed

.changeset/rich-drinks-punch.md

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@medusajs/core-flows": patch
3+
"@medusajs/workflows-sdk": patch
4+
---
5+
6+
feat(workflows-sdk): Allow when then in parallelize

packages/core/core-flows/src/product/workflows/batch-products.ts

+10-9
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,20 @@ import { updateProductsWorkflow } from "./update-products"
1919
/**
2020
* The products to manage.
2121
*/
22-
export interface BatchProductWorkflowInput extends BatchWorkflowInput<
23-
CreateProductWorkflowInputDTO,
24-
UpdateProductWorkflowInputDTO
25-
> {}
22+
export interface BatchProductWorkflowInput
23+
extends BatchWorkflowInput<
24+
CreateProductWorkflowInputDTO,
25+
UpdateProductWorkflowInputDTO
26+
> {}
2627

2728
export const batchProductsWorkflowId = "batch-products"
2829
/**
2930
* This workflow creates, updates, or deletes products. It's used by the
3031
* [Manage Products Admin API Route](https://docs.medusajs.com/api/admin#products_postproductsbatch).
31-
*
32+
*
3233
* You can use this workflow within your own customizations or custom workflows to manage products in bulk. This is
3334
* also useful when writing a [seed script](https://docs.medusajs.com/learn/fundamentals/custom-cli-scripts/seed-data) or a custom import script.
34-
*
35+
*
3536
* @example
3637
* const { result } = await batchProductsWorkflow(container)
3738
* .run({
@@ -68,11 +69,11 @@ export const batchProductsWorkflowId = "batch-products"
6869
* }
6970
* ],
7071
* delete: ["prod_321"]
71-
* }
72+
* }
7273
* })
73-
*
74+
*
7475
* @summary
75-
*
76+
*
7677
* Manage products in bulk.
7778
*/
7879
export const batchProductsWorkflow = createWorkflow(

packages/core/workflows-sdk/src/utils/composer/__tests__/compose.ts

+67
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@ import {
1919
StepFunction,
2020
StepResponse,
2121
transform,
22+
when,
2223
WorkflowResponse,
2324
} from ".."
2425
import { MedusaWorkflow } from "../../../medusa-workflow"
2526
import { createHook } from "../create-hook"
27+
import { setTimeout } from "timers/promises"
2628

2729
jest.setTimeout(30000)
2830

@@ -742,6 +744,71 @@ describe("Workflow composer", function () {
742744
})
743745
})
744746

747+
it("should compose a new workflow with conditional parallelized steps", async () => {
748+
const stepResults: string[] = []
749+
750+
const mockStep1Fn = jest.fn().mockImplementation(async () => {
751+
await setTimeout(100)
752+
stepResults.push("step1")
753+
return new StepResponse(true)
754+
}) as any
755+
const mockStep2Fn = jest.fn().mockImplementation(() => {
756+
stepResults.push("step2")
757+
return new StepResponse(true)
758+
}) as any
759+
const mockStep3Fn = jest.fn().mockImplementation(() => {
760+
stepResults.push("step3")
761+
return new StepResponse(true)
762+
}) as any
763+
const mockStep4Fn = jest.fn().mockImplementation(() => {
764+
stepResults.push("step4")
765+
return new StepResponse(true)
766+
}) as any
767+
768+
const step1 = createStep("step1", mockStep1Fn)
769+
const step2 = createStep("step2", mockStep2Fn)
770+
const step3 = createStep("step3", mockStep3Fn)
771+
const step4 = createStep("step4", mockStep4Fn)
772+
773+
const callStep2IfNeeded = () => {
774+
return when({}, () => false).then(() => {
775+
return step2()
776+
})
777+
}
778+
779+
const callStep3IfNeeded = () => {
780+
return when({}, () => false).then(() => {
781+
return step4()
782+
})
783+
}
784+
785+
const workflow = createWorkflow("workflow1", function (input) {
786+
const [ret1, ret2, ret3, ret4] = parallelize(
787+
step1(),
788+
callStep2IfNeeded(),
789+
step3(),
790+
callStep3IfNeeded()
791+
)
792+
return new WorkflowResponse({ ret1, ret2, ret3, ret4 })
793+
})
794+
795+
const { result: workflowResult } = await workflow().run()
796+
797+
expect(mockStep1Fn).toHaveBeenCalledTimes(1)
798+
expect(mockStep2Fn).toHaveBeenCalledTimes(0)
799+
expect(mockStep3Fn).toHaveBeenCalledTimes(1)
800+
expect(mockStep4Fn).toHaveBeenCalledTimes(0)
801+
802+
expect(workflowResult).toEqual({
803+
ret1: true,
804+
ret2: undefined,
805+
ret3: true,
806+
ret4: undefined,
807+
})
808+
809+
expect(stepResults).toEqual(["step3", "step1"])
810+
})
811+
745812
it("should compose a new workflow with parallelize steps and rollback them all in case of error", async () => {
746813
const step1CompensationFn = jest.fn().mockImplementation(() => {
747814
return "step1 compensation"

packages/core/workflows-sdk/src/utils/composer/parallelize.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,29 @@ import { OrchestrationUtils } from "@medusajs/utils"
1919
* createPricesStep,
2020
* attachProductToSalesChannelStep
2121
* } from "./steps"
22-
*
22+
*
2323
* interface WorkflowInput {
2424
* title: string
2525
* }
26-
*
26+
*
2727
* const myWorkflow = createWorkflow(
2828
* "my-workflow",
2929
* (input: WorkflowInput) => {
3030
* const product = createProductStep(input)
31-
*
31+
*
3232
* const [prices, productSalesChannel] = parallelize(
3333
* createPricesStep(product),
3434
* attachProductToSalesChannelStep(product)
3535
* )
36-
*
36+
*
3737
* return new WorkflowResponse({
3838
* prices,
3939
* productSalesChannel
4040
* })
4141
* }
4242
* )
4343
*/
44-
export function parallelize<TResult extends WorkflowData[]>(
44+
export function parallelize<TResult extends (WorkflowData | undefined)[]>(
4545
...steps: TResult
4646
): TResult {
4747
if (!global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext]) {
@@ -64,7 +64,7 @@ export function parallelize<TResult extends WorkflowData[]>(
6464
const stepOntoMerge = steps.shift()!
6565
this.flow.mergeActions(
6666
stepOntoMerge.__step__,
67-
...steps.map((step) => step.__step__)
67+
...steps.map((step) => step!.__step__)
6868
)
6969

7070
return resultSteps as unknown as TResult

packages/core/workflows-sdk/src/utils/composer/type.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,9 @@ export type CreateWorkflowComposerContext = {
104104
fn: StepFunctionResult
105105
) => WorkflowData<TOutput>
106106
hookBinder: (name: string, fn: () => HookHandler) => void
107-
parallelizeBinder: <TOutput extends WorkflowData[] = WorkflowData[]>(
107+
parallelizeBinder: <
108+
TOutput extends (WorkflowData | undefined)[] = WorkflowData[]
109+
>(
108110
fn: (this: CreateWorkflowComposerContext) => TOutput
109111
) => TOutput
110112
}

0 commit comments

Comments
 (0)