Skip to content

Commit 3cd9ae9

Browse files
wip
1 parent f79333c commit 3cd9ae9

File tree

5 files changed

+46
-25
lines changed

5 files changed

+46
-25
lines changed

packages/core/orchestration/src/transaction/transaction-orchestrator.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,8 @@ export class TransactionOrchestrator extends EventEmitter {
676676
let continueExecution = true
677677

678678
while (continueExecution) {
679+
console.log("FLOW", transaction.modelId)
680+
679681
if (transaction.hasFinished()) {
680682
return
681683
}
@@ -694,7 +696,7 @@ export class TransactionOrchestrator extends EventEmitter {
694696
continue
695697
}
696698

697-
console.log("nextSteps", JSON.stringify(nextSteps, null, 2))
699+
console.log("Remaining STEPS", nextSteps.remaining)
698700

699701
if (nextSteps.remaining === 0) {
700702
if (transaction.hasTimeout()) {
@@ -709,7 +711,6 @@ export class TransactionOrchestrator extends EventEmitter {
709711

710712
let hasSyncSteps = false
711713
for (const step of nextSteps.next) {
712-
console.log("step", step.id)
713714
const curState = step.getStates()
714715
const type = step.isCompensating()
715716
? TransactionHandlerType.COMPENSATE
@@ -718,8 +719,6 @@ export class TransactionOrchestrator extends EventEmitter {
718719
step.lastAttempt = Date.now()
719720
step.attempts++
720721

721-
console.log("step current state", curState)
722-
723722
if (curState.state === TransactionStepState.NOT_STARTED) {
724723
if (!step.startedAt) {
725724
step.startedAt = Date.now()
@@ -874,6 +873,7 @@ export class TransactionOrchestrator extends EventEmitter {
874873
)
875874
})
876875
.catch(async (error) => {
876+
console.log("ON Failure SYNC", error)
877877
if (SkipExecutionError.isSkipExecutionError(error)) {
878878
continueExecution = false
879879
return
@@ -960,6 +960,8 @@ export class TransactionOrchestrator extends EventEmitter {
960960
)
961961
})
962962
.catch(async (error) => {
963+
console.log("ON Failure", error)
964+
963965
if (SkipExecutionError.isSkipExecutionError(error)) {
964966
continueExecution = false
965967
return
@@ -978,6 +980,8 @@ export class TransactionOrchestrator extends EventEmitter {
978980
}
979981

980982
await setStepFailure(error, { response })
983+
984+
console.log("ON Failure")
981985
})
982986
})
983987
)

packages/core/workflows-sdk/src/utils/composer/create-step.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -334,11 +334,11 @@ function wrapConditionalStep(
334334
* createStep,
335335
* StepResponse
336336
* } from "@medusajs/framework/workflows-sdk"
337-
*
337+
*
338338
* interface CreateProductInput {
339339
* title: string
340340
* }
341-
*
341+
*
342342
* export const createProductStep = createStep(
343343
* "createProductStep",
344344
* async function (

packages/core/workflows-sdk/src/utils/composer/create-workflow.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,22 +47,22 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
4747
* createProductStep,
4848
* getProductStep,
4949
* } from "./steps"
50-
*
50+
*
5151
* interface WorkflowInput {
5252
* title: string
5353
* }
54-
*
54+
*
5555
* const myWorkflow = createWorkflow(
5656
* "my-workflow",
5757
* (input: WorkflowInput) => {
5858
* // Everything here will be executed and resolved later
5959
* // during the execution. Including the data access.
60-
*
60+
*
6161
* const product = createProductStep(input)
6262
* return new WorkflowResponse(getProductStep(product.id))
6363
* }
6464
* )
65-
*
65+
*
6666
* export async function GET(
6767
* req: MedusaRequest,
6868
* res: MedusaResponse
@@ -73,7 +73,7 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null
7373
* title: "Shirt"
7474
* }
7575
* })
76-
*
76+
*
7777
* res.json({
7878
* product
7979
* })
@@ -207,6 +207,11 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
207207
)
208208
},
209209
async (transaction, stepContext) => {
210+
console.log(
211+
"BEFORE SUB WORKFLOW COMPENSATION",
212+
stepContext.idempotencyKey
213+
)
214+
210215
if (!transaction) {
211216
return
212217
}
@@ -222,6 +227,10 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
222227
parentStepIdempotencyKey: stepContext.idempotencyKey,
223228
},
224229
})
230+
231+
console.log("RESPONSE FROM SUB WORKFLOW COMPENSATION", {
232+
transaction,
233+
})
225234
}
226235
)(input) as ReturnType<StepFunction<TData, TResult>>
227236

packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@ import {
1212
Modules,
1313
TransactionHandlerType,
1414
} from "@medusajs/framework/utils"
15+
import {
16+
createStep,
17+
createWorkflow,
18+
StepResponse,
19+
transform,
20+
WorkflowResponse,
21+
} from "@medusajs/framework/workflows-sdk"
1522
import { moduleIntegrationTestRunner } from "@medusajs/test-utils"
1623
import { WorkflowsModuleService } from "@services"
1724
import { asFunction } from "awilix"
18-
import { setTimeout as setTimeoutPromise } from "timers/promises"
25+
import { setTimeout, setTimeout as setTimeoutPromise } from "timers/promises"
1926
import "../__fixtures__"
2027
import {
2128
conditionalStep2Invoke,
@@ -29,14 +36,6 @@ import {
2936
workflowEventGroupIdStep2Mock,
3037
} from "../__fixtures__/workflow_event_group_id"
3138
import { createScheduled } from "../__fixtures__/workflow_scheduled"
32-
import {
33-
createStep,
34-
createWorkflow,
35-
StepResponse,
36-
transform,
37-
WorkflowResponse,
38-
} from "@medusajs/framework/workflows-sdk"
39-
import { setTimeout } from "timers/promises"
4039

4140
jest.setTimeout(3000000)
4241

@@ -126,8 +125,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
126125
const transformMock = jest.fn()
127126

128127
const step0 = createStep(
129-
"step0",
128+
"RACE_step0",
130129
async (_) => {
130+
console.log("step0")
131131
step0InvokeMock()
132132
return new StepResponse("result from step 0")
133133
},
@@ -137,8 +137,9 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
137137
)
138138

139139
const step1 = createStep(
140-
"step1",
140+
"RACE_step1",
141141
async (_) => {
142+
console.log("step1")
142143
step1InvokeMock()
143144
await setTimeout(2000)
144145
throw new Error("error from step 1")
@@ -148,17 +149,18 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
148149
}
149150
)
150151

151-
const step2 = createStep("step2", async (input: any) => {
152+
const step2 = createStep("RACE_step2", async (input: any) => {
153+
console.log("step2")
152154
step2InvokeMock()
153155
return new StepResponse({ result: input })
154156
})
155157

156-
const subWorkflow = createWorkflow("sub-workflow-1", function () {
158+
const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () {
157159
const status = step1()
158160
return new WorkflowResponse(status)
159161
})
160162

161-
createWorkflow("workflow-1", function () {
163+
createWorkflow("RACE_workflow-1", function () {
162164
const build = step0()
163165

164166
const status = subWorkflow.runAsStep({} as any).config({

packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,8 @@ export class WorkflowOrchestratorService {
189189
options?: WorkflowOrchestratorRunOptions<T>,
190190
@MedusaContext() sharedContext: Context = {}
191191
) {
192+
console.log("RUN", workflowIdOrWorkflow)
193+
192194
const {
193195
input,
194196
transactionId,
@@ -226,6 +228,7 @@ export class WorkflowOrchestratorService {
226228
const originalOnFinishHandler = events.onFinish!
227229
delete events.onFinish
228230

231+
console.log(" Before original RUN")
229232
const ret = await exportedWorkflow.run({
230233
input,
231234
throwOnError: false,
@@ -239,6 +242,7 @@ export class WorkflowOrchestratorService {
239242
const hasFinished = ret.transaction.hasFinished()
240243
const metadata = ret.transaction.getFlow().metadata
241244
const { parentStepIdempotencyKey } = metadata ?? {}
245+
242246
const hasFailed = [
243247
TransactionState.REVERTED,
244248
TransactionState.FAILED,
@@ -252,6 +256,8 @@ export class WorkflowOrchestratorService {
252256
hasFailed,
253257
}
254258

259+
console.log({ acknowledgement })
260+
255261
if (hasFinished) {
256262
const { result, errors } = ret
257263

0 commit comments

Comments
 (0)