From 302ff9338799df5d17622b0afac0e30277ef948c Mon Sep 17 00:00:00 2001 From: adrien2p Date: Fri, 7 Mar 2025 15:02:31 +0100 Subject: [PATCH 1/9] fix(workflow-engines): Race continue of consecutive workflow continuation when retry interval is used --- .../orchestration/src/transaction/errors.ts | 9 ++ .../transaction/transaction-orchestrator.ts | 19 +++ .../integration-tests/__tests__/index.spec.ts | 83 ++++++++++- .../workflow-engine-inmemory/package.json | 2 +- .../utils/workflow-orchestrator-storage.ts | 121 ++++++++++++++-- .../utils/workflow-orchestrator-storage.ts | 132 ++++++++++++++++++ 6 files changed, 354 insertions(+), 12 deletions(-) diff --git a/packages/core/orchestration/src/transaction/errors.ts b/packages/core/orchestration/src/transaction/errors.ts index 8a3ff7bb7f265..081f067db5eae 100644 --- a/packages/core/orchestration/src/transaction/errors.ts +++ b/packages/core/orchestration/src/transaction/errors.ts @@ -84,3 +84,12 @@ export class NonSerializableCheckPointError extends Error { this.name = "NonSerializableCheckPointError" } } + +export class SkipExecutionError extends Error { + static isSkipExecutionError(error: Error): error is SkipExecutionError { + return ( + error instanceof SkipExecutionError || + error?.name === "SkipExecutionError" + ) + } +} diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 7bb4ebd241cfb..2fe44ec3f437b 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -28,6 +28,7 @@ import { import { EventEmitter } from "events" import { PermanentStepFailureError, + SkipExecutionError, SkipStepResponse, TransactionStepTimeoutError, TransactionTimeoutError, @@ -821,6 +822,14 @@ export class TransactionOrchestrator extends EventEmitter { ] as Parameters if (!isAsync) { + try { + await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + return + } + } + hasSyncSteps = true const stepHandler = async () => { @@ -858,6 +867,11 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + continueExecution = false + return + } + const response = error?.getStepResponse?.() if (this.hasExpired({ transaction, step }, Date.now())) { @@ -939,6 +953,11 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + if (SkipExecutionError.isSkipExecutionError(error)) { + continueExecution = false + return + } + const response = error?.getStepResponse?.() if ( diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index e52ed640d76d1..b1df4c8e9f0b8 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -29,14 +29,93 @@ import { workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" +import { + createStep, + createWorkflow, + StepResponse, + transform, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" +import { setTimeout } from "timers/promises" -jest.setTimeout(100000) +jest.setTimeout(3000000) moduleIntegrationTestRunner({ moduleName: Modules.WORKFLOW_ENGINE, resolve: __dirname + "/../..", testSuite: ({ service: workflowOrcModule, medusaApp }) => { - describe("Workflow Orchestrator module", function () { + it.only("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + + const step0InvokeMock = jest.fn() + const step1InvokeMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep("step0", async (_, context) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }) + + const step1 = createStep("step1", async (_, context) => { + step1InvokeMock() + await setTimeout(2000) + return new StepResponse({ isSuccess: true }) + }) + + const step2 = createStep("step2", async (input: any, context) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow("workflow-1", function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow-1", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(1) + expect(transformMock).toHaveBeenCalledTimes(1) + done() + } + }, + }) + + workflowOrcModule + .run("workflow-1", { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + }) + + describe.skip("Workflow Orchestrator module", function () { let query: RemoteQueryFunction beforeEach(() => { diff --git a/packages/modules/workflow-engine-inmemory/package.json b/packages/modules/workflow-engine-inmemory/package.json index a391f753e10bb..105e5a41a2320 100644 --- a/packages/modules/workflow-engine-inmemory/package.json +++ b/packages/modules/workflow-engine-inmemory/package.json @@ -29,7 +29,7 @@ "resolve:aliases": "tsc --showConfig -p tsconfig.json > tsconfig.resolved.json && tsc-alias -p tsconfig.resolved.json && rimraf tsconfig.resolved.json", "build": "rimraf dist && tsc --build && npm run resolve:aliases", "test": "jest --passWithNoTests --runInBand --bail --forceExit -- src", - "test:integration": "jest --silent --forceExit -- integration-tests/**/__tests__/**/*.ts", + "test:integration": "jest --forceExit -- integration-tests/**/__tests__/**/*.ts", "migration:initial": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create --initial", "migration:create": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:create", "migration:up": " MIKRO_ORM_CLI_CONFIG=./mikro-orm.config.dev.ts medusa-mikro-orm migration:up", diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 0a3248084e1b7..75e1891b80662 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -3,12 +3,18 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipExecutionError, TransactionCheckpoint, + TransactionFlow, TransactionOptions, TransactionStep, } from "@medusajs/framework/orchestration" import { Logger, ModulesSdkTypes } from "@medusajs/framework/types" -import { MedusaError, TransactionState } from "@medusajs/framework/utils" +import { + MedusaError, + TransactionState, + TransactionStepState, +} from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" import { CronExpression, parseExpression } from "cron-parser" @@ -121,8 +127,6 @@ export class InMemoryDistributedTransactionStorage ttl?: number, options?: TransactionOptions ): Promise { - this.storage.set(key, data) - /** * Store the retention time only if the transaction is done, failed or reverted. * From that moment, this tuple can be later on archived or deleted after the retention time. @@ -135,11 +139,15 @@ export class InMemoryDistributedTransactionStorage const { retentionTime, idempotent } = options ?? {} - if (hasFinished) { - Object.assign(data, { - retention_time: retentionTime, - }) - } + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + }) + + Object.assign(data, { + retention_time: retentionTime, + }) + this.storage.set(key, data) if (hasFinished && !retentionTime && !idempotent) { await this.deleteFromDb(data) @@ -148,7 +156,102 @@ export class InMemoryDistributedTransactionStorage } if (hasFinished) { - this.storage.delete(key) + const tenMinutes = 10 * 60 * 1000 + setTimeout(() => { + this.storage.delete(key) + }, tenMinutes) + } + } + + async #preventRaceConditionExecutionIfNecessary({ + data, + key, + }: { + data: TransactionCheckpoint + key: string + }) { + /** + * In case many execution can succeed simultaneously, we need to ensure that the latest + * execution does continue if a previous execution is considered finished + */ + const currentFlow = data.flow + const { flow: latestUpdatedFlow } = + (await this.get(key)) ?? ({ flow: {} } as { flow: TransactionFlow }) + + const currentFlowLastInvokingStepIndex = Object.values( + currentFlow.steps + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps + ? 1 // There is no other execution, so the current execution is the latest + : Object.values( + (latestUpdatedFlow.steps as Record) ?? {} + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + const currentFlowLastCompensatingStepIndex = Object.values( + currentFlow.steps + ) + .reverse() + .findIndex((step) => { + return [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ].includes(step.compensate?.state) + }) + + const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps + ? 1 // There is no other execution, so the current execution is the latest + : Object.values( + (latestUpdatedFlow.steps as Record) ?? {} + ) + .reverse() + .findIndex((step) => { + return [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ].includes(step.compensate?.state) + }) + + const isLatestExecutionFinishedIndex = -1 + const invokeShouldBeSkipped = + (latestUpdatedFlowLastInvokingStepIndex === + isLatestExecutionFinishedIndex || + currentFlowLastInvokingStepIndex < + latestUpdatedFlowLastInvokingStepIndex) && + currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex + + const compensateShouldBeSkipped = + latestUpdatedFlowLastCompensatingStepIndex === + isLatestExecutionFinishedIndex || + (currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex && + latestUpdatedFlowLastCompensatingStepIndex !== + isLatestExecutionFinishedIndex) + + if ( + (data.flow.state !== TransactionState.COMPENSATING && + invokeShouldBeSkipped) || + (data.flow.state === TransactionState.COMPENSATING && + compensateShouldBeSkipped) || + (latestUpdatedFlow.state === TransactionState.COMPENSATING && + currentFlow.state !== latestUpdatedFlow.state) + ) { + /** + * If the latest execution is ahead of the current execution in terms of completion then we + * should skip to prevent multiple completion/execution of the same step. The same goes for + * compensating steps but in the opposite direction. + */ + throw new SkipExecutionError("already finished by another execution") } } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 8365160509550..01b62e3c27405 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -4,7 +4,9 @@ import { IDistributedSchedulerStorage, IDistributedTransactionStorage, SchedulerOptions, + SkipExecutionError, TransactionCheckpoint, + TransactionFlow, TransactionOptions, TransactionStep, } from "@medusajs/framework/orchestration" @@ -13,6 +15,7 @@ import { MedusaError, promiseAll, TransactionState, + TransactionStepState, } from "@medusajs/framework/utils" import { WorkflowOrchestratorService } from "@services" import { Queue, Worker } from "bullmq" @@ -263,6 +266,11 @@ export class RedisDistributedTransactionStorage const { retentionTime, idempotent } = options ?? {} + await this.#preventRaceConditionExecutionIfNecessary({ + data, + key, + }) + if (hasFinished) { Object.assign(data, { retention_time: retentionTime, @@ -271,6 +279,38 @@ export class RedisDistributedTransactionStorage const stringifiedData = JSON.stringify(data) + /** + * In case many execution can succeed simultaneously, we need to ensure that the latest + * execution does continue if a previous execution is considered finished + */ + const currentFlow = data.flow + const { flow: latestUpdatedFlow } = + (await this.get(key)) ?? ({ flow: {} } as { flow: TransactionFlow }) + + const currentFlowLastInvokingStepIndex = Object.values( + currentFlow.steps + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + const latestUpdatedFlowLastInvokingStepIndex = Object.values( + (latestUpdatedFlow.steps as Record) ?? {} + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + if ( + currentFlowLastInvokingStepIndex < latestUpdatedFlowLastInvokingStepIndex + ) { + throw new SkipExecutionError("already finished by another execution") + } + if (!hasFinished) { if (ttl) { await this.redisClient.set(key, stringifiedData, "EX", ttl) @@ -457,4 +497,96 @@ export class RedisDistributedTransactionStorage repeatableJobs.map((job) => this.jobQueue?.removeRepeatableByKey(job.key)) ) } + + async #preventRaceConditionExecutionIfNecessary({ + data, + key, + }: { + data: TransactionCheckpoint + key: string + }) { + /** + * In case many execution can succeed simultaneously, we need to ensure that the latest + * execution does continue if a previous execution is considered finished + */ + const currentFlow = data.flow + const { flow: latestUpdatedFlow } = + (await this.get(key)) ?? ({ flow: {} } as { flow: TransactionFlow }) + + const currentFlowLastInvokingStepIndex = Object.values( + currentFlow.steps + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + const latestUpdatedFlowLastInvokingStepIndex = !latestUpdatedFlow.steps + ? 1 // There is no other execution, so the current execution is the latest + : Object.values( + (latestUpdatedFlow.steps as Record) ?? {} + ).findIndex((step) => { + return [ + TransactionStepState.INVOKING, + TransactionStepState.NOT_STARTED, + ].includes(step.invoke?.state) + }) + + const currentFlowLastCompensatingStepIndex = Object.values( + currentFlow.steps + ) + .reverse() + .findIndex((step) => { + return [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ].includes(step.compensate?.state) + }) + + const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps + ? 1 // There is no other execution, so the current execution is the latest + : Object.values( + (latestUpdatedFlow.steps as Record) ?? {} + ) + .reverse() + .findIndex((step) => { + return [ + TransactionStepState.COMPENSATING, + TransactionStepState.NOT_STARTED, + ].includes(step.compensate?.state) + }) + + const isLatestExecutionFinishedIndex = -1 + const invokeShouldBeSkipped = + (latestUpdatedFlowLastInvokingStepIndex === + isLatestExecutionFinishedIndex || + currentFlowLastInvokingStepIndex < + latestUpdatedFlowLastInvokingStepIndex) && + currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex + + const compensateShouldBeSkipped = + latestUpdatedFlowLastCompensatingStepIndex === + isLatestExecutionFinishedIndex || + (currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex && + latestUpdatedFlowLastCompensatingStepIndex !== + isLatestExecutionFinishedIndex) + + if ( + (data.flow.state !== TransactionState.COMPENSATING && + invokeShouldBeSkipped) || + (data.flow.state === TransactionState.COMPENSATING && + compensateShouldBeSkipped) || + (latestUpdatedFlow.state === TransactionState.COMPENSATING && + currentFlow.state !== latestUpdatedFlow.state) + ) { + /** + * If the latest execution is ahead of the current execution in terms of completion then we + * should skip to prevent multiple completion/execution of the same step. The same goes for + * compensating steps but in the opposite direction. + */ + throw new SkipExecutionError("already finished by another execution") + } + } } From f79333c753f6f2af407ebe34a908c783139bd5ce Mon Sep 17 00:00:00 2001 From: adrien2p Date: Fri, 7 Mar 2025 16:33:08 +0100 Subject: [PATCH 2/9] fix(workflow-engines): Race continue of consecutive workflow continuation when retry interval is used --- .../transaction/transaction-orchestrator.ts | 9 +- .../integration-tests/__tests__/index.spec.ts | 95 ++++++++++++++++++- 2 files changed, 99 insertions(+), 5 deletions(-) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 2fe44ec3f437b..9fb739828c8d4 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -694,6 +694,8 @@ export class TransactionOrchestrator extends EventEmitter { continue } + console.log("nextSteps", JSON.stringify(nextSteps, null, 2)) + if (nextSteps.remaining === 0) { if (transaction.hasTimeout()) { void transaction.clearTransactionTimeout() @@ -701,11 +703,13 @@ export class TransactionOrchestrator extends EventEmitter { await transaction.saveCheckpoint() + console.log("FINISH", transaction.getFlow().transactionId) this.emit(DistributedTransactionEvent.FINISH, { transaction }) } let hasSyncSteps = false for (const step of nextSteps.next) { + console.log("step", step.id) const curState = step.getStates() const type = step.isCompensating() ? TransactionHandlerType.COMPENSATE @@ -714,6 +718,8 @@ export class TransactionOrchestrator extends EventEmitter { step.lastAttempt = Date.now() step.attempts++ + console.log("step current state", curState) + if (curState.state === TransactionStepState.NOT_STARTED) { if (!step.startedAt) { step.startedAt = Date.now() @@ -826,7 +832,8 @@ export class TransactionOrchestrator extends EventEmitter { await transaction.saveCheckpoint() } catch (error) { if (SkipExecutionError.isSkipExecutionError(error)) { - return + continueExecution = false + continue } } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index b1df4c8e9f0b8..932adc103a90d 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -44,7 +44,7 @@ moduleIntegrationTestRunner({ moduleName: Modules.WORKFLOW_ENGINE, resolve: __dirname + "/../..", testSuite: ({ service: workflowOrcModule, medusaApp }) => { - it.only("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { + it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" const step0InvokeMock = jest.fn() @@ -52,18 +52,18 @@ moduleIntegrationTestRunner({ const step2InvokeMock = jest.fn() const transformMock = jest.fn() - const step0 = createStep("step0", async (_, context) => { + const step0 = createStep("step0", async (_) => { step0InvokeMock() return new StepResponse("result from step 0") }) - const step1 = createStep("step1", async (_, context) => { + const step1 = createStep("step1", async (_) => { step1InvokeMock() await setTimeout(2000) return new StepResponse({ isSuccess: true }) }) - const step2 = createStep("step2", async (input: any, context) => { + const step2 = createStep("step2", async (input: any) => { step2InvokeMock() return new StepResponse({ result: input }) }) @@ -115,6 +115,93 @@ moduleIntegrationTestRunner({ }) }) + it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + + const step0InvokeMock = jest.fn() + const step0CompensateMock = jest.fn() + const step1InvokeMock = jest.fn() + const step1CompensateMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep( + "step0", + async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }, + () => { + step0CompensateMock() + } + ) + + const step1 = createStep( + "step1", + async (_) => { + step1InvokeMock() + await setTimeout(2000) + throw new Error("error from step 1") + }, + () => { + step1CompensateMock() + } + ) + + const step2 = createStep("step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow("workflow-1", function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow-1", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step0CompensateMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step1CompensateMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(0) + expect(transformMock).toHaveBeenCalledTimes(0) + done() + } + }, + }) + + workflowOrcModule + .run("workflow-1", { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + }) + describe.skip("Workflow Orchestrator module", function () { let query: RemoteQueryFunction From 3cd9ae9c2933f00b27b468a4b33cd37ea585bd61 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Fri, 7 Mar 2025 13:53:21 -0300 Subject: [PATCH 3/9] wip --- .../transaction/transaction-orchestrator.ts | 12 +++++--- .../src/utils/composer/create-step.ts | 4 +-- .../src/utils/composer/create-workflow.ts | 19 ++++++++---- .../integration-tests/__tests__/index.spec.ts | 30 ++++++++++--------- .../src/services/workflow-orchestrator.ts | 6 ++++ 5 files changed, 46 insertions(+), 25 deletions(-) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 9fb739828c8d4..187f6794f87ed 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -676,6 +676,8 @@ export class TransactionOrchestrator extends EventEmitter { let continueExecution = true while (continueExecution) { + console.log("FLOW", transaction.modelId) + if (transaction.hasFinished()) { return } @@ -694,7 +696,7 @@ export class TransactionOrchestrator extends EventEmitter { continue } - console.log("nextSteps", JSON.stringify(nextSteps, null, 2)) + console.log("Remaining STEPS", nextSteps.remaining) if (nextSteps.remaining === 0) { if (transaction.hasTimeout()) { @@ -709,7 +711,6 @@ export class TransactionOrchestrator extends EventEmitter { let hasSyncSteps = false for (const step of nextSteps.next) { - console.log("step", step.id) const curState = step.getStates() const type = step.isCompensating() ? TransactionHandlerType.COMPENSATE @@ -718,8 +719,6 @@ export class TransactionOrchestrator extends EventEmitter { step.lastAttempt = Date.now() step.attempts++ - console.log("step current state", curState) - if (curState.state === TransactionStepState.NOT_STARTED) { if (!step.startedAt) { step.startedAt = Date.now() @@ -874,6 +873,7 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + console.log("ON Failure SYNC", error) if (SkipExecutionError.isSkipExecutionError(error)) { continueExecution = false return @@ -960,6 +960,8 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { + console.log("ON Failure", error) + if (SkipExecutionError.isSkipExecutionError(error)) { continueExecution = false return @@ -978,6 +980,8 @@ export class TransactionOrchestrator extends EventEmitter { } await setStepFailure(error, { response }) + + console.log("ON Failure") }) }) ) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index bdf58d6c5f463..737805a0bfb44 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -334,11 +334,11 @@ function wrapConditionalStep( * createStep, * StepResponse * } from "@medusajs/framework/workflows-sdk" - * + * * interface CreateProductInput { * title: string * } - * + * * export const createProductStep = createStep( * "createProductStep", * async function ( diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 58f5134ed3b24..494d3abfbba25 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -47,22 +47,22 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null * createProductStep, * getProductStep, * } from "./steps" - * + * * interface WorkflowInput { * title: string * } - * + * * const myWorkflow = createWorkflow( * "my-workflow", * (input: WorkflowInput) => { * // Everything here will be executed and resolved later * // during the execution. Including the data access. - * + * * const product = createProductStep(input) * return new WorkflowResponse(getProductStep(product.id)) * } * ) - * + * * export async function GET( * req: MedusaRequest, * res: MedusaResponse @@ -73,7 +73,7 @@ global[OrchestrationUtils.SymbolMedusaWorkflowComposerContext] = null * title: "Shirt" * } * }) - * + * * res.json({ * product * }) @@ -207,6 +207,11 @@ export function createWorkflow( ) }, async (transaction, stepContext) => { + console.log( + "BEFORE SUB WORKFLOW COMPENSATION", + stepContext.idempotencyKey + ) + if (!transaction) { return } @@ -222,6 +227,10 @@ export function createWorkflow( parentStepIdempotencyKey: stepContext.idempotencyKey, }, }) + + console.log("RESPONSE FROM SUB WORKFLOW COMPENSATION", { + transaction, + }) } )(input) as ReturnType> diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 932adc103a90d..9880f4425879a 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -12,10 +12,17 @@ import { Modules, TransactionHandlerType, } from "@medusajs/framework/utils" +import { + createStep, + createWorkflow, + StepResponse, + transform, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" -import { setTimeout as setTimeoutPromise } from "timers/promises" +import { setTimeout, setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { conditionalStep2Invoke, @@ -29,14 +36,6 @@ import { workflowEventGroupIdStep2Mock, } from "../__fixtures__/workflow_event_group_id" import { createScheduled } from "../__fixtures__/workflow_scheduled" -import { - createStep, - createWorkflow, - StepResponse, - transform, - WorkflowResponse, -} from "@medusajs/framework/workflows-sdk" -import { setTimeout } from "timers/promises" jest.setTimeout(3000000) @@ -126,8 +125,9 @@ moduleIntegrationTestRunner({ const transformMock = jest.fn() const step0 = createStep( - "step0", + "RACE_step0", async (_) => { + console.log("step0") step0InvokeMock() return new StepResponse("result from step 0") }, @@ -137,8 +137,9 @@ moduleIntegrationTestRunner({ ) const step1 = createStep( - "step1", + "RACE_step1", async (_) => { + console.log("step1") step1InvokeMock() await setTimeout(2000) throw new Error("error from step 1") @@ -148,17 +149,18 @@ moduleIntegrationTestRunner({ } ) - const step2 = createStep("step2", async (input: any) => { + const step2 = createStep("RACE_step2", async (input: any) => { + console.log("step2") step2InvokeMock() return new StepResponse({ result: input }) }) - const subWorkflow = createWorkflow("sub-workflow-1", function () { + const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { const status = step1() return new WorkflowResponse(status) }) - createWorkflow("workflow-1", function () { + createWorkflow("RACE_workflow-1", function () { const build = step0() const status = subWorkflow.runAsStep({} as any).config({ diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index 43788f11884bd..c1f33deb69ff8 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -189,6 +189,8 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { + console.log("RUN", workflowIdOrWorkflow) + const { input, transactionId, @@ -226,6 +228,7 @@ export class WorkflowOrchestratorService { const originalOnFinishHandler = events.onFinish! delete events.onFinish + console.log(" Before original RUN") const ret = await exportedWorkflow.run({ input, throwOnError: false, @@ -239,6 +242,7 @@ export class WorkflowOrchestratorService { const hasFinished = ret.transaction.hasFinished() const metadata = ret.transaction.getFlow().metadata const { parentStepIdempotencyKey } = metadata ?? {} + const hasFailed = [ TransactionState.REVERTED, TransactionState.FAILED, @@ -252,6 +256,8 @@ export class WorkflowOrchestratorService { hasFailed, } + console.log({ acknowledgement }) + if (hasFinished) { const { result, errors } = ret From 8cda65a85eb12353ca41254d13b31e8a102d57a2 Mon Sep 17 00:00:00 2001 From: adrien2p Date: Fri, 7 Mar 2025 19:02:24 +0100 Subject: [PATCH 4/9] fix checkpoint --- .../transaction/transaction-orchestrator.ts | 76 +++++++++++++------ .../integration-tests/__tests__/index.spec.ts | 15 ++-- .../utils/workflow-orchestrator-storage.ts | 17 +++-- 3 files changed, 72 insertions(+), 36 deletions(-) diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 187f6794f87ed..4e1787a32f3ba 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -472,9 +472,6 @@ export class TransactionOrchestrator extends EventEmitter { ) } - const flow = transaction.getFlow() - const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) - if (!hasStepTimedOut) { step.changeStatus(TransactionStepStatus.OK) } @@ -485,8 +482,15 @@ export class TransactionOrchestrator extends EventEmitter { step.changeState(TransactionStepState.DONE) } - if (step.definition.async || options?.storeExecution) { + let shouldEmit = true + try { await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + shouldEmit = false + } else { + throw error + } } const cleaningUp: Promise[] = [] @@ -499,10 +503,12 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - const eventName = step.isCompensating() - ? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS - : DistributedTransactionEvent.STEP_SUCCESS - transaction.emit(eventName, { step, transaction }) + if (shouldEmit) { + const eventName = step.isCompensating() + ? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS + : DistributedTransactionEvent.STEP_SUCCESS + transaction.emit(eventName, { step, transaction }) + } } private static async skipStep( @@ -605,7 +611,6 @@ export class TransactionOrchestrator extends EventEmitter { } const flow = transaction.getFlow() - const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) const cleaningUp: Promise[] = [] @@ -654,8 +659,15 @@ export class TransactionOrchestrator extends EventEmitter { } } - if (step.definition.async || options?.storeExecution) { + let shouldEmit = true + try { await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + shouldEmit = false + } else { + throw error + } } if (step.hasRetryScheduled()) { @@ -664,10 +676,12 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - const eventName = step.isCompensating() - ? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE - : DistributedTransactionEvent.STEP_FAILURE - transaction.emit(eventName, { step, transaction }) + if (shouldEmit) { + const eventName = step.isCompensating() + ? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE + : DistributedTransactionEvent.STEP_FAILURE + transaction.emit(eventName, { step, transaction }) + } } private async executeNext( @@ -696,17 +710,27 @@ export class TransactionOrchestrator extends EventEmitter { continue } - console.log("Remaining STEPS", nextSteps.remaining) + console.log( + "Remaining STEPS", + transaction.getFlow().modelId, + nextSteps.remaining, + nextSteps.next.map((step) => step.id), + nextSteps.current ?? "nothing" + ) if (nextSteps.remaining === 0) { if (transaction.hasTimeout()) { void transaction.clearTransactionTimeout() } - await transaction.saveCheckpoint() + // let shouldStop = false + // await transaction.saveCheckpoint() console.log("FINISH", transaction.getFlow().transactionId) this.emit(DistributedTransactionEvent.FINISH, { transaction }) + // if (shouldStop) { + // return + // } } let hasSyncSteps = false @@ -827,14 +851,15 @@ export class TransactionOrchestrator extends EventEmitter { ] as Parameters if (!isAsync) { - try { - await transaction.saveCheckpoint() - } catch (error) { - if (SkipExecutionError.isSkipExecutionError(error)) { - continueExecution = false - continue - } - } + // try { + // await transaction.saveCheckpoint() + // } catch (error) { + // if (SkipExecutionError.isSkipExecutionError(error)) { + // await transaction.clearStepTimeout(step) + // continueExecution = false + // continue + // } + // } hasSyncSteps = true @@ -873,8 +898,8 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - console.log("ON Failure SYNC", error) if (SkipExecutionError.isSkipExecutionError(error)) { + await transaction.clearStepTimeout(step) continueExecution = false return } @@ -963,6 +988,7 @@ export class TransactionOrchestrator extends EventEmitter { console.log("ON Failure", error) if (SkipExecutionError.isSkipExecutionError(error)) { + await transaction.clearStepTimeout(step) continueExecution = false return } diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index 9880f4425879a..cf4719ab0f66a 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -116,6 +116,7 @@ moduleIntegrationTestRunner({ it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" + const workflowId = "RACE_workflow-1" const step0InvokeMock = jest.fn() const step0CompensateMock = jest.fn() @@ -132,6 +133,7 @@ moduleIntegrationTestRunner({ return new StepResponse("result from step 0") }, () => { + console.log("step0 compensate") step0CompensateMock() } ) @@ -145,6 +147,7 @@ moduleIntegrationTestRunner({ throw new Error("error from step 1") }, () => { + console.log("step1 compensate") step1CompensateMock() } ) @@ -160,14 +163,14 @@ moduleIntegrationTestRunner({ return new WorkflowResponse(status) }) - createWorkflow("RACE_workflow-1", function () { + createWorkflow(workflowId, function () { const build = step0() const status = subWorkflow.runAsStep({} as any).config({ async: true, compensateAsync: true, backgroundExecution: true, - retryIntervalAwaiting: 1, + // retryIntervalAwaiting: 1, }) const transformedResult = transform({ status }, (data) => { @@ -182,14 +185,14 @@ moduleIntegrationTestRunner({ }) void workflowOrcModule.subscribe({ - workflowId: "workflow-1", + workflowId: workflowId, transactionId, subscriber: (event) => { if (event.eventType === "onFinish") { expect(step0InvokeMock).toHaveBeenCalledTimes(1) expect(step0CompensateMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) - expect(step1CompensateMock.mock.calls.length).toBeGreaterThan(1) + expect(step1InvokeMock).toHaveBeenCalledTimes(1) + expect(step1CompensateMock).toHaveBeenCalledTimes(1) expect(step2InvokeMock).toHaveBeenCalledTimes(0) expect(transformMock).toHaveBeenCalledTimes(0) done() @@ -198,7 +201,7 @@ moduleIntegrationTestRunner({ }) workflowOrcModule - .run("workflow-1", { transactionId }) + .run(workflowId, { transactionId }) .then(({ result }) => { expect(result).toBe("result from step 0") }) diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 75e1891b80662..cc3edc5da7ac1 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -231,12 +231,11 @@ export class InMemoryDistributedTransactionStorage currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex const compensateShouldBeSkipped = - latestUpdatedFlowLastCompensatingStepIndex === + (latestUpdatedFlowLastCompensatingStepIndex === isLatestExecutionFinishedIndex || - (currentFlowLastCompensatingStepIndex < - latestUpdatedFlowLastCompensatingStepIndex && - latestUpdatedFlowLastCompensatingStepIndex !== - isLatestExecutionFinishedIndex) + currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex) && + currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex if ( (data.flow.state !== TransactionState.COMPENSATING && @@ -246,6 +245,14 @@ export class InMemoryDistributedTransactionStorage (latestUpdatedFlow.state === TransactionState.COMPENSATING && currentFlow.state !== latestUpdatedFlow.state) ) { + console.log("skipping execution", { + currentState: data.flow.state, + latestUpdatedState: latestUpdatedFlow.state, + latestUpdatedFlowLastCompensatingStepIndex, + currentFlowLastCompensatingStepIndex, + currentFlowLastInvokingStepIndex, + latestUpdatedFlowLastInvokingStepIndex, + }) /** * If the latest execution is ahead of the current execution in terms of completion then we * should skip to prevent multiple completion/execution of the same step. The same goes for From cfb27ed14523e39f4a87ae5ae19fdb19cffe27c2 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Fri, 7 Mar 2025 16:41:57 -0300 Subject: [PATCH 5/9] cleanup --- .../transaction/distributed-transaction.ts | 3 +- .../transaction/transaction-orchestrator.ts | 130 +++++++++--------- .../src/utils/composer/create-workflow.ts | 9 -- .../integration-tests/__tests__/index.spec.ts | 10 +- 4 files changed, 67 insertions(+), 85 deletions(-) diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index afa5d280eb2f7..e2256860f1320 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -2,6 +2,7 @@ import { isDefined } from "@medusajs/utils" import { EventEmitter } from "events" import { IDistributedTransactionStorage } from "./datastore/abstract-storage" import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage" +import { NonSerializableCheckPointError } from "./errors" import { TransactionOrchestrator } from "./transaction-orchestrator" import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { @@ -9,7 +10,6 @@ import { TransactionHandlerType, TransactionState, } from "./types" -import { NonSerializableCheckPointError } from "./errors" /** * @typedef TransactionMetadata @@ -248,7 +248,6 @@ class DistributedTransaction extends EventEmitter { return } - await this.saveCheckpoint() await DistributedTransaction.keyValueStore.scheduleRetry( this, step, diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 4e1787a32f3ba..9376969c937f0 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -253,8 +253,6 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await transaction.saveCheckpoint() - this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) hasTimedOut = true @@ -282,8 +280,6 @@ export class TransactionOrchestrator extends EventEmitter { ) hasTimedOut = true - await transaction.saveCheckpoint() - this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) } return hasTimedOut @@ -458,7 +454,9 @@ export class TransactionOrchestrator extends EventEmitter { transaction: DistributedTransactionType, step: TransactionStep, response: unknown - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -509,25 +507,35 @@ export class TransactionOrchestrator extends EventEmitter { : DistributedTransactionEvent.STEP_SUCCESS transaction.emit(eventName, { step, transaction }) } + + return { + stopExecution: !shouldEmit, + } } private static async skipStep( transaction: DistributedTransactionType, step: TransactionStep - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT - const flow = transaction.getFlow() - const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) - if (!hasStepTimedOut) { step.changeStatus(TransactionStepStatus.OK) step.changeState(TransactionStepState.SKIPPED) } - if (step.definition.async || options?.storeExecution) { + let shouldEmit = true + try { await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + shouldEmit = false + } else { + throw error + } } const cleaningUp: Promise[] = [] @@ -540,8 +548,14 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - const eventName = DistributedTransactionEvent.STEP_SKIPPED - transaction.emit(eventName, { step, transaction }) + if (shouldEmit) { + const eventName = DistributedTransactionEvent.STEP_SKIPPED + transaction.emit(eventName, { step, transaction }) + } + + return { + stopExecution: !shouldEmit, + } } private static async setStepTimeout( @@ -596,7 +610,15 @@ export class TransactionOrchestrator extends EventEmitter { maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES, isTimeout = false, timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { + if (SkipExecutionError.isSkipExecutionError(error)) { + return { + stopExecution: false, + } + } + step.failures++ if (isErrorLike(error)) { @@ -682,6 +704,10 @@ export class TransactionOrchestrator extends EventEmitter { : DistributedTransactionEvent.STEP_FAILURE transaction.emit(eventName, { step, transaction }) } + + return { + stopExecution: !shouldEmit, + } } private async executeNext( @@ -690,8 +716,6 @@ export class TransactionOrchestrator extends EventEmitter { let continueExecution = true while (continueExecution) { - console.log("FLOW", transaction.modelId) - if (transaction.hasFinished()) { return } @@ -710,27 +734,13 @@ export class TransactionOrchestrator extends EventEmitter { continue } - console.log( - "Remaining STEPS", - transaction.getFlow().modelId, - nextSteps.remaining, - nextSteps.next.map((step) => step.id), - nextSteps.current ?? "nothing" - ) - if (nextSteps.remaining === 0) { if (transaction.hasTimeout()) { void transaction.clearTransactionTimeout() } - // let shouldStop = false - // await transaction.saveCheckpoint() - console.log("FINISH", transaction.getFlow().transactionId) this.emit(DistributedTransactionEvent.FINISH, { transaction }) - // if (shouldStop) { - // return - // } } let hasSyncSteps = false @@ -813,19 +823,18 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( transaction, step, error, endRetry ? 0 : step.definition.maxRetries ) - if (isAsync) { - await transaction.scheduleRetry( - step, - step.definition.retryInterval ?? 0 - ) + if (isAsync && !ret.stopExecution) { + await transaction.scheduleRetry(step, 0) } + + return ret } const traceData = { @@ -851,16 +860,6 @@ export class TransactionOrchestrator extends EventEmitter { ] as Parameters if (!isAsync) { - // try { - // await transaction.saveCheckpoint() - // } catch (error) { - // if (SkipExecutionError.isSkipExecutionError(error)) { - // await transaction.clearStepTimeout(step) - // continueExecution = false - // continue - // } - // } - hasSyncSteps = true const stepHandler = async () => { @@ -898,12 +897,6 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - if (SkipExecutionError.isSkipExecutionError(error)) { - await transaction.clearStepTimeout(step) - continueExecution = false - return - } - const response = error?.getStepResponse?.() if (this.hasExpired({ transaction, step }, Date.now())) { @@ -921,10 +914,13 @@ export class TransactionOrchestrator extends EventEmitter { endRetry: true, response, }) + return } - await setStepFailure(error, { response }) + await setStepFailure(error, { + response, + }) }) ) } else { @@ -979,20 +975,9 @@ export class TransactionOrchestrator extends EventEmitter { } // check nested flow - await transaction.scheduleRetry( - step, - step.definition.retryInterval ?? 0 - ) + await transaction.scheduleRetry(step, 0) }) .catch(async (error) => { - console.log("ON Failure", error) - - if (SkipExecutionError.isSkipExecutionError(error)) { - await transaction.clearStepTimeout(step) - continueExecution = false - return - } - const response = error?.getStepResponse?.() if ( @@ -1002,12 +987,13 @@ export class TransactionOrchestrator extends EventEmitter { endRetry: true, response, }) + return } - await setStepFailure(error, { response }) - - console.log("ON Failure") + await setStepFailure(error, { + response, + }) }) }) ) @@ -1015,7 +1001,15 @@ export class TransactionOrchestrator extends EventEmitter { } if (hasSyncSteps && options?.storeExecution) { - await transaction.saveCheckpoint() + try { + await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + break + } else { + throw error + } + } } await promiseAll(execution) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 494d3abfbba25..18d9eecf8d748 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -207,11 +207,6 @@ export function createWorkflow( ) }, async (transaction, stepContext) => { - console.log( - "BEFORE SUB WORKFLOW COMPENSATION", - stepContext.idempotencyKey - ) - if (!transaction) { return } @@ -227,10 +222,6 @@ export function createWorkflow( parentStepIdempotencyKey: stepContext.idempotencyKey, }, }) - - console.log("RESPONSE FROM SUB WORKFLOW COMPENSATION", { - transaction, - }) } )(input) as ReturnType> diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index cf4719ab0f66a..d3b192982a418 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -114,7 +114,7 @@ moduleIntegrationTestRunner({ }) }) - it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" const workflowId = "RACE_workflow-1" @@ -128,7 +128,6 @@ moduleIntegrationTestRunner({ const step0 = createStep( "RACE_step0", async (_) => { - console.log("step0") step0InvokeMock() return new StepResponse("result from step 0") }, @@ -143,7 +142,7 @@ moduleIntegrationTestRunner({ async (_) => { console.log("step1") step1InvokeMock() - await setTimeout(2000) + await setTimeout(500) throw new Error("error from step 1") }, () => { @@ -153,7 +152,6 @@ moduleIntegrationTestRunner({ ) const step2 = createStep("RACE_step2", async (input: any) => { - console.log("step2") step2InvokeMock() return new StepResponse({ result: input }) }) @@ -170,7 +168,7 @@ moduleIntegrationTestRunner({ async: true, compensateAsync: true, backgroundExecution: true, - // retryIntervalAwaiting: 1, + retryIntervalAwaiting: 0.1, }) const transformedResult = transform({ status }, (data) => { @@ -191,7 +189,7 @@ moduleIntegrationTestRunner({ if (event.eventType === "onFinish") { expect(step0InvokeMock).toHaveBeenCalledTimes(1) expect(step0CompensateMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) expect(step1CompensateMock).toHaveBeenCalledTimes(1) expect(step2InvokeMock).toHaveBeenCalledTimes(0) expect(transformMock).toHaveBeenCalledTimes(0) From 11ab6f33159bfed89eb895069a9c139a743b6d96 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Fri, 7 Mar 2025 16:52:37 -0300 Subject: [PATCH 6/9] redis --- .../utils/workflow-orchestrator-storage.ts | 13 -- .../integration-tests/__tests__/index.spec.ts | 173 ++++++++++++++++++ .../src/services/workflow-orchestrator.ts | 3 - .../utils/workflow-orchestrator-storage.ts | 46 +---- 4 files changed, 177 insertions(+), 58 deletions(-) diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index cc3edc5da7ac1..99d727854af22 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -245,19 +245,6 @@ export class InMemoryDistributedTransactionStorage (latestUpdatedFlow.state === TransactionState.COMPENSATING && currentFlow.state !== latestUpdatedFlow.state) ) { - console.log("skipping execution", { - currentState: data.flow.state, - latestUpdatedState: latestUpdatedFlow.state, - latestUpdatedFlowLastCompensatingStepIndex, - currentFlowLastCompensatingStepIndex, - currentFlowLastInvokingStepIndex, - latestUpdatedFlowLastInvokingStepIndex, - }) - /** - * If the latest execution is ahead of the current execution in terms of completion then we - * should skip to prevent multiple completion/execution of the same step. The same goes for - * compensating steps but in the opposite direction. - */ throw new SkipExecutionError("already finished by another execution") } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index d536aeccadb44..8f94f77883c8e 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -18,6 +18,13 @@ import { TransactionHandlerType, TransactionStepState, } from "@medusajs/framework/utils" +import { + createStep, + createWorkflow, + StepResponse, + transform, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { asValue } from "awilix" import { setTimeout as setTimeoutSync } from "timers" @@ -50,6 +57,172 @@ moduleIntegrationTestRunner({ }, }, testSuite: ({ service: workflowOrcModule, medusaApp }) => { + it.skip("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + + const step0InvokeMock = jest.fn() + const step1InvokeMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep("step0", async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }) + + const step1 = createStep("step1", async (_) => { + step1InvokeMock() + await setTimeout(2000) + return new StepResponse({ isSuccess: true }) + }) + + const step2 = createStep("step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow("workflow-1", function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow-1", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(1) + expect(transformMock).toHaveBeenCalledTimes(1) + done() + } + }, + }) + + workflowOrcModule + .run("workflow-1", { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + + failTrap(done) + }) + + it.skip("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + const workflowId = "RACE_workflow-1" + + const step0InvokeMock = jest.fn() + const step0CompensateMock = jest.fn() + const step1InvokeMock = jest.fn() + const step1CompensateMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep( + "RACE_step0", + async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }, + () => { + console.log("step0 compensate") + step0CompensateMock() + } + ) + + const step1 = createStep( + "RACE_step1", + async (_) => { + console.log("step1") + step1InvokeMock() + await setTimeout(500) + throw new Error("error from step 1") + }, + () => { + console.log("step1 compensate") + step1CompensateMock() + } + ) + + const step2 = createStep("RACE_step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow(workflowId, function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 0.1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: workflowId, + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step0CompensateMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) + expect(step1CompensateMock).toHaveBeenCalledTimes(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(0) + expect(transformMock).toHaveBeenCalledTimes(0) + done() + } + }, + }) + + workflowOrcModule + .run(workflowId, { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + + failTrap(done) + }) + describe("Workflow Orchestrator module", function () { beforeEach(async () => { await TestDatabase.clearTables() diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index c1f33deb69ff8..beb9fac9952ca 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -189,8 +189,6 @@ export class WorkflowOrchestratorService { options?: WorkflowOrchestratorRunOptions, @MedusaContext() sharedContext: Context = {} ) { - console.log("RUN", workflowIdOrWorkflow) - const { input, transactionId, @@ -228,7 +226,6 @@ export class WorkflowOrchestratorService { const originalOnFinishHandler = events.onFinish! delete events.onFinish - console.log(" Before original RUN") const ret = await exportedWorkflow.run({ input, throwOnError: false, diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 01b62e3c27405..7886e7dbc5cde 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -279,38 +279,6 @@ export class RedisDistributedTransactionStorage const stringifiedData = JSON.stringify(data) - /** - * In case many execution can succeed simultaneously, we need to ensure that the latest - * execution does continue if a previous execution is considered finished - */ - const currentFlow = data.flow - const { flow: latestUpdatedFlow } = - (await this.get(key)) ?? ({ flow: {} } as { flow: TransactionFlow }) - - const currentFlowLastInvokingStepIndex = Object.values( - currentFlow.steps - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) - - const latestUpdatedFlowLastInvokingStepIndex = Object.values( - (latestUpdatedFlow.steps as Record) ?? {} - ).findIndex((step) => { - return [ - TransactionStepState.INVOKING, - TransactionStepState.NOT_STARTED, - ].includes(step.invoke?.state) - }) - - if ( - currentFlowLastInvokingStepIndex < latestUpdatedFlowLastInvokingStepIndex - ) { - throw new SkipExecutionError("already finished by another execution") - } - if (!hasFinished) { if (ttl) { await this.redisClient.set(key, stringifiedData, "EX", ttl) @@ -566,12 +534,11 @@ export class RedisDistributedTransactionStorage currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex const compensateShouldBeSkipped = - latestUpdatedFlowLastCompensatingStepIndex === + (latestUpdatedFlowLastCompensatingStepIndex === isLatestExecutionFinishedIndex || - (currentFlowLastCompensatingStepIndex < - latestUpdatedFlowLastCompensatingStepIndex && - latestUpdatedFlowLastCompensatingStepIndex !== - isLatestExecutionFinishedIndex) + currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex) && + currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex if ( (data.flow.state !== TransactionState.COMPENSATING && @@ -581,11 +548,6 @@ export class RedisDistributedTransactionStorage (latestUpdatedFlow.state === TransactionState.COMPENSATING && currentFlow.state !== latestUpdatedFlow.state) ) { - /** - * If the latest execution is ahead of the current execution in terms of completion then we - * should skip to prevent multiple completion/execution of the same step. The same goes for - * compensating steps but in the opposite direction. - */ throw new SkipExecutionError("already finished by another execution") } } From f8b8fc6209c0ff36d9e8314031691bcb2e3c87bc Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 10 Mar 2025 13:38:46 -0300 Subject: [PATCH 7/9] race condition tests --- .../transaction/distributed-transaction.ts | 1 - .../transaction/transaction-orchestrator.ts | 37 ++-- .../src/utils/composer/create-step.ts | 2 - .../integration-tests/__tests__/index.spec.ts | 173 +-------------- .../integration-tests/__tests__/race.spec.ts | 182 ++++++++++++++++ .../src/services/workflows-module.ts | 4 +- .../utils/workflow-orchestrator-storage.ts | 23 +- .../integration-tests/__tests__/index.spec.ts | 173 --------------- .../integration-tests/__tests__/race.spec.ts | 203 ++++++++++++++++++ .../src/services/workflow-orchestrator.ts | 2 - .../src/services/workflows-module.ts | 4 +- .../utils/workflow-orchestrator-storage.ts | 23 +- 12 files changed, 436 insertions(+), 391 deletions(-) create mode 100644 packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts create mode 100644 packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index e2256860f1320..7859d1284dd01 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -266,7 +266,6 @@ class DistributedTransaction extends EventEmitter { return } - await this.saveCheckpoint() await DistributedTransaction.keyValueStore.scheduleTransactionTimeout( this, Date.now(), diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 9376969c937f0..25c2fe4b36131 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -240,6 +240,7 @@ export class TransactionOrchestrator extends EventEmitter { ) { const flow = transaction.getFlow() let hasTimedOut = false + if (!flow.timedOutAt && this.hasExpired({ transaction }, Date.now())) { flow.timedOutAt = Date.now() @@ -484,11 +485,7 @@ export class TransactionOrchestrator extends EventEmitter { try { await transaction.saveCheckpoint() } catch (error) { - if (SkipExecutionError.isSkipExecutionError(error)) { - shouldEmit = false - } else { - throw error - } + shouldEmit = false } const cleaningUp: Promise[] = [] @@ -721,7 +718,6 @@ export class TransactionOrchestrator extends EventEmitter { } const flow = transaction.getFlow() - const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) const nextSteps = await this.checkAllSteps(transaction) const execution: Promise[] = [] @@ -739,11 +735,10 @@ export class TransactionOrchestrator extends EventEmitter { void transaction.clearTransactionTimeout() } - console.log("FINISH", transaction.getFlow().transactionId) + await transaction.saveCheckpoint() this.emit(DistributedTransactionEvent.FINISH, { transaction }) } - let hasSyncSteps = false for (const step of nextSteps.next) { const curState = step.getStates() const type = step.isCompensating() @@ -860,8 +855,6 @@ export class TransactionOrchestrator extends EventEmitter { ] as Parameters if (!isAsync) { - hasSyncSteps = true - const stepHandler = async () => { return await transaction.handler(...handlerArgs) } @@ -1000,15 +993,13 @@ export class TransactionOrchestrator extends EventEmitter { } } - if (hasSyncSteps && options?.storeExecution) { - try { - await transaction.saveCheckpoint() - } catch (error) { - if (SkipExecutionError.isSkipExecutionError(error)) { - break - } else { - throw error - } + try { + await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + break + } else { + throw error } } @@ -1043,11 +1034,9 @@ export class TransactionOrchestrator extends EventEmitter { flow.state = TransactionState.INVOKING flow.startedAt = Date.now() - if (this.getOptions().store) { - await transaction.saveCheckpoint( - flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL - ) - } + await transaction.saveCheckpoint( + flow.hasAsyncSteps ? 0 : TransactionOrchestrator.DEFAULT_TTL + ) if (transaction.hasTimeout()) { await transaction.scheduleTransactionTimeout( diff --git a/packages/core/workflows-sdk/src/utils/composer/create-step.ts b/packages/core/workflows-sdk/src/utils/composer/create-step.ts index 737805a0bfb44..9fa20f23641ba 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-step.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-step.ts @@ -192,8 +192,6 @@ export function applyStep< ret.__step__ = newStepName WorkflowManager.update(this.workflowId, this.flow, this.handlers) - //const confRef = proxify(ret) - if (global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition]) { const flagSteps = global[OrchestrationUtils.SymbolMedusaWorkflowComposerCondition].steps diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index d3b192982a418..bee8c94d41108 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -12,17 +12,10 @@ import { Modules, TransactionHandlerType, } from "@medusajs/framework/utils" -import { - createStep, - createWorkflow, - StepResponse, - transform, - WorkflowResponse, -} from "@medusajs/framework/workflows-sdk" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { WorkflowsModuleService } from "@services" import { asFunction } from "awilix" -import { setTimeout, setTimeout as setTimeoutPromise } from "timers/promises" +import { setTimeout as setTimeoutPromise } from "timers/promises" import "../__fixtures__" import { conditionalStep2Invoke, @@ -43,169 +36,7 @@ moduleIntegrationTestRunner({ moduleName: Modules.WORKFLOW_ENGINE, resolve: __dirname + "/../..", testSuite: ({ service: workflowOrcModule, medusaApp }) => { - it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" - - const step0InvokeMock = jest.fn() - const step1InvokeMock = jest.fn() - const step2InvokeMock = jest.fn() - const transformMock = jest.fn() - - const step0 = createStep("step0", async (_) => { - step0InvokeMock() - return new StepResponse("result from step 0") - }) - - const step1 = createStep("step1", async (_) => { - step1InvokeMock() - await setTimeout(2000) - return new StepResponse({ isSuccess: true }) - }) - - const step2 = createStep("step2", async (input: any) => { - step2InvokeMock() - return new StepResponse({ result: input }) - }) - - const subWorkflow = createWorkflow("sub-workflow-1", function () { - const status = step1() - return new WorkflowResponse(status) - }) - - createWorkflow("workflow-1", function () { - const build = step0() - - const status = subWorkflow.runAsStep({} as any).config({ - async: true, - compensateAsync: true, - backgroundExecution: true, - retryIntervalAwaiting: 1, - }) - - const transformedResult = transform({ status }, (data) => { - transformMock() - return { - status: data.status, - } - }) - - step2(transformedResult) - return new WorkflowResponse(build) - }) - - void workflowOrcModule.subscribe({ - workflowId: "workflow-1", - transactionId, - subscriber: (event) => { - if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(1) - expect(transformMock).toHaveBeenCalledTimes(1) - done() - } - }, - }) - - workflowOrcModule - .run("workflow-1", { transactionId }) - .then(({ result }) => { - expect(result).toBe("result from step 0") - }) - }) - - it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" - const workflowId = "RACE_workflow-1" - - const step0InvokeMock = jest.fn() - const step0CompensateMock = jest.fn() - const step1InvokeMock = jest.fn() - const step1CompensateMock = jest.fn() - const step2InvokeMock = jest.fn() - const transformMock = jest.fn() - - const step0 = createStep( - "RACE_step0", - async (_) => { - step0InvokeMock() - return new StepResponse("result from step 0") - }, - () => { - console.log("step0 compensate") - step0CompensateMock() - } - ) - - const step1 = createStep( - "RACE_step1", - async (_) => { - console.log("step1") - step1InvokeMock() - await setTimeout(500) - throw new Error("error from step 1") - }, - () => { - console.log("step1 compensate") - step1CompensateMock() - } - ) - - const step2 = createStep("RACE_step2", async (input: any) => { - step2InvokeMock() - return new StepResponse({ result: input }) - }) - - const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { - const status = step1() - return new WorkflowResponse(status) - }) - - createWorkflow(workflowId, function () { - const build = step0() - - const status = subWorkflow.runAsStep({} as any).config({ - async: true, - compensateAsync: true, - backgroundExecution: true, - retryIntervalAwaiting: 0.1, - }) - - const transformedResult = transform({ status }, (data) => { - transformMock() - return { - status: data.status, - } - }) - - step2(transformedResult) - return new WorkflowResponse(build) - }) - - void workflowOrcModule.subscribe({ - workflowId: workflowId, - transactionId, - subscriber: (event) => { - if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step0CompensateMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) - expect(step1CompensateMock).toHaveBeenCalledTimes(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(0) - expect(transformMock).toHaveBeenCalledTimes(0) - done() - } - }, - }) - - workflowOrcModule - .run(workflowId, { transactionId }) - .then(({ result }) => { - expect(result).toBe("result from step 0") - }) - }) - - describe.skip("Workflow Orchestrator module", function () { + describe("Workflow Orchestrator module", function () { let query: RemoteQueryFunction beforeEach(() => { diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts new file mode 100644 index 0000000000000..8da509ad8a62c --- /dev/null +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/race.spec.ts @@ -0,0 +1,182 @@ +import { IWorkflowEngineService } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" +import { + createStep, + createWorkflow, + StepResponse, + transform, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { setTimeout as setTimeoutSync } from "timers" +import { setTimeout } from "timers/promises" +import "../__fixtures__" + +jest.setTimeout(3000000) + +moduleIntegrationTestRunner({ + moduleName: Modules.WORKFLOW_ENGINE, + resolve: __dirname + "/../..", + testSuite: ({ service: workflowOrcModule, medusaApp }) => { + describe("Testing race condition of the workflow during retry", () => { + it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { + const step0InvokeMock = jest.fn() + const step1InvokeMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep("step0", async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }) + + const step1 = createStep("step1", async (_) => { + step1InvokeMock() + await setTimeout(2000) + return new StepResponse({ isSuccess: true }) + }) + + const step2 = createStep("step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow("workflow-1", function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow-1", + + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(1) + expect(transformMock).toHaveBeenCalledTimes(1) + setTimeoutSync(done, 500) + } + }, + }) + + workflowOrcModule + .run("workflow-1", { throwOnError: false }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + .catch((e) => e) + }) + + it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + const workflowId = "RACE_workflow-1" + + const step0InvokeMock = jest.fn() + const step0CompensateMock = jest.fn() + const step1InvokeMock = jest.fn() + const step1CompensateMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep( + "RACE_step0", + async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }, + () => { + step0CompensateMock() + } + ) + + const step1 = createStep( + "RACE_step1", + async (_) => { + step1InvokeMock() + await setTimeout(300) + throw new Error("error from step 1") + }, + () => { + step1CompensateMock() + } + ) + + const step2 = createStep("RACE_step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow(workflowId, function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 0.1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: workflowId, + subscriber: async (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step0CompensateMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) + expect(step1CompensateMock).toHaveBeenCalledTimes(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(0) + expect(transformMock).toHaveBeenCalledTimes(0) + setTimeoutSync(done, 500) + } + }, + }) + + workflowOrcModule + .run(workflowId, { + throwOnError: false, + }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + .catch((e) => e) + }) + }) + }, +}) diff --git a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts index 5d89c45767f1b..c4c910068caaa 100644 --- a/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-inmemory/src/services/workflows-module.ts @@ -62,7 +62,9 @@ export class WorkflowsModuleService< await this.clearExpiredExecutions() this.clearTimeout_ = setInterval(async () => { - await this.clearExpiredExecutions() + try { + await this.clearExpiredExecutions() + } catch {} }, 1000 * 60 * 60) }, onApplicationShutdown: async () => { diff --git a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts index 99d727854af22..91462c84227f3 100644 --- a/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-inmemory/src/utils/workflow-orchestrator-storage.ts @@ -210,7 +210,7 @@ export class InMemoryDistributedTransactionStorage }) const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps - ? 1 // There is no other execution, so the current execution is the latest + ? -1 // There is no other execution, so the current execution is the latest : Object.values( (latestUpdatedFlow.steps as Record) ?? {} ) @@ -231,11 +231,11 @@ export class InMemoryDistributedTransactionStorage currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex const compensateShouldBeSkipped = - (latestUpdatedFlowLastCompensatingStepIndex === - isLatestExecutionFinishedIndex || - currentFlowLastCompensatingStepIndex < - latestUpdatedFlowLastCompensatingStepIndex) && - currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex + currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex && + currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex && + latestUpdatedFlowLastCompensatingStepIndex !== + isLatestExecutionFinishedIndex if ( (data.flow.state !== TransactionState.COMPENSATING && @@ -243,9 +243,16 @@ export class InMemoryDistributedTransactionStorage (data.flow.state === TransactionState.COMPENSATING && compensateShouldBeSkipped) || (latestUpdatedFlow.state === TransactionState.COMPENSATING && - currentFlow.state !== latestUpdatedFlow.state) + ![TransactionState.REVERTED, TransactionState.FAILED].includes( + currentFlow.state + ) && + currentFlow.state !== latestUpdatedFlow.state) || + (latestUpdatedFlow.state === TransactionState.REVERTED && + currentFlow.state !== TransactionState.REVERTED) || + (latestUpdatedFlow.state === TransactionState.FAILED && + currentFlow.state !== TransactionState.FAILED) ) { - throw new SkipExecutionError("already finished by another execution") + throw new SkipExecutionError("Already finished by another execution") } } diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts index 8f94f77883c8e..d536aeccadb44 100644 --- a/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/index.spec.ts @@ -18,13 +18,6 @@ import { TransactionHandlerType, TransactionStepState, } from "@medusajs/framework/utils" -import { - createStep, - createWorkflow, - StepResponse, - transform, - WorkflowResponse, -} from "@medusajs/framework/workflows-sdk" import { moduleIntegrationTestRunner } from "@medusajs/test-utils" import { asValue } from "awilix" import { setTimeout as setTimeoutSync } from "timers" @@ -57,172 +50,6 @@ moduleIntegrationTestRunner({ }, }, testSuite: ({ service: workflowOrcModule, medusaApp }) => { - it.skip("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" - - const step0InvokeMock = jest.fn() - const step1InvokeMock = jest.fn() - const step2InvokeMock = jest.fn() - const transformMock = jest.fn() - - const step0 = createStep("step0", async (_) => { - step0InvokeMock() - return new StepResponse("result from step 0") - }) - - const step1 = createStep("step1", async (_) => { - step1InvokeMock() - await setTimeout(2000) - return new StepResponse({ isSuccess: true }) - }) - - const step2 = createStep("step2", async (input: any) => { - step2InvokeMock() - return new StepResponse({ result: input }) - }) - - const subWorkflow = createWorkflow("sub-workflow-1", function () { - const status = step1() - return new WorkflowResponse(status) - }) - - createWorkflow("workflow-1", function () { - const build = step0() - - const status = subWorkflow.runAsStep({} as any).config({ - async: true, - compensateAsync: true, - backgroundExecution: true, - retryIntervalAwaiting: 1, - }) - - const transformedResult = transform({ status }, (data) => { - transformMock() - return { - status: data.status, - } - }) - - step2(transformedResult) - return new WorkflowResponse(build) - }) - - void workflowOrcModule.subscribe({ - workflowId: "workflow-1", - transactionId, - subscriber: (event) => { - if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(1) - expect(transformMock).toHaveBeenCalledTimes(1) - done() - } - }, - }) - - workflowOrcModule - .run("workflow-1", { transactionId }) - .then(({ result }) => { - expect(result).toBe("result from step 0") - }) - - failTrap(done) - }) - - it.skip("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { - const transactionId = "transaction_id" - const workflowId = "RACE_workflow-1" - - const step0InvokeMock = jest.fn() - const step0CompensateMock = jest.fn() - const step1InvokeMock = jest.fn() - const step1CompensateMock = jest.fn() - const step2InvokeMock = jest.fn() - const transformMock = jest.fn() - - const step0 = createStep( - "RACE_step0", - async (_) => { - step0InvokeMock() - return new StepResponse("result from step 0") - }, - () => { - console.log("step0 compensate") - step0CompensateMock() - } - ) - - const step1 = createStep( - "RACE_step1", - async (_) => { - console.log("step1") - step1InvokeMock() - await setTimeout(500) - throw new Error("error from step 1") - }, - () => { - console.log("step1 compensate") - step1CompensateMock() - } - ) - - const step2 = createStep("RACE_step2", async (input: any) => { - step2InvokeMock() - return new StepResponse({ result: input }) - }) - - const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { - const status = step1() - return new WorkflowResponse(status) - }) - - createWorkflow(workflowId, function () { - const build = step0() - - const status = subWorkflow.runAsStep({} as any).config({ - async: true, - compensateAsync: true, - backgroundExecution: true, - retryIntervalAwaiting: 0.1, - }) - - const transformedResult = transform({ status }, (data) => { - transformMock() - return { - status: data.status, - } - }) - - step2(transformedResult) - return new WorkflowResponse(build) - }) - - void workflowOrcModule.subscribe({ - workflowId: workflowId, - transactionId, - subscriber: (event) => { - if (event.eventType === "onFinish") { - expect(step0InvokeMock).toHaveBeenCalledTimes(1) - expect(step0CompensateMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) - expect(step1CompensateMock).toHaveBeenCalledTimes(1) - expect(step2InvokeMock).toHaveBeenCalledTimes(0) - expect(transformMock).toHaveBeenCalledTimes(0) - done() - } - }, - }) - - workflowOrcModule - .run(workflowId, { transactionId }) - .then(({ result }) => { - expect(result).toBe("result from step 0") - }) - - failTrap(done) - }) - describe("Workflow Orchestrator module", function () { beforeEach(async () => { await TestDatabase.clearTables() diff --git a/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts new file mode 100644 index 0000000000000..1304b6f8fc794 --- /dev/null +++ b/packages/modules/workflow-engine-redis/integration-tests/__tests__/race.spec.ts @@ -0,0 +1,203 @@ +import { IWorkflowEngineService } from "@medusajs/framework/types" +import { Modules } from "@medusajs/framework/utils" +import { + createStep, + createWorkflow, + StepResponse, + transform, + WorkflowResponse, +} from "@medusajs/framework/workflows-sdk" +import { moduleIntegrationTestRunner } from "@medusajs/test-utils" +import { setTimeout as setTimeoutSync } from "timers" +import { setTimeout } from "timers/promises" +import "../__fixtures__" + +jest.setTimeout(999900000) + +const failTrap = (done) => { + setTimeoutSync(() => { + // REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending + console.warn( + "Jest is breaking the event emit with its debouncer. This allows to continue the test by managing the timeout of the test manually." + ) + done() + }, 5000) +} + +// REF:https://stackoverflow.com/questions/78028715/jest-async-test-with-event-emitter-isnt-ending + +moduleIntegrationTestRunner({ + moduleName: Modules.WORKFLOW_ENGINE, + resolve: __dirname + "/../..", + moduleOptions: { + redis: { + url: "localhost:6379", + }, + }, + testSuite: ({ service: workflowOrcModule, medusaApp }) => { + describe("Testing race condition of the workflow during retry", () => { + it("should prevent race continuation of the workflow during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + + const step0InvokeMock = jest.fn() + const step1InvokeMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep("step0", async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }) + + const step1 = createStep("step1", async (_) => { + step1InvokeMock() + await setTimeout(2000) + return new StepResponse({ isSuccess: true }) + }) + + const step2 = createStep("step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow("workflow-1", function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: "workflow-1", + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(1) + expect(transformMock).toHaveBeenCalledTimes(1) + setTimeoutSync(done, 500) + } + }, + }) + + workflowOrcModule + .run("workflow-1", { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + + failTrap(done) + }) + + it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + const transactionId = "transaction_id" + const workflowId = "RACE_workflow-1" + + const step0InvokeMock = jest.fn() + const step0CompensateMock = jest.fn() + const step1InvokeMock = jest.fn() + const step1CompensateMock = jest.fn() + const step2InvokeMock = jest.fn() + const transformMock = jest.fn() + + const step0 = createStep( + "RACE_step0", + async (_) => { + step0InvokeMock() + return new StepResponse("result from step 0") + }, + () => { + step0CompensateMock() + } + ) + + const step1 = createStep( + "RACE_step1", + async (_) => { + step1InvokeMock() + await setTimeout(500) + throw new Error("error from step 1") + }, + () => { + step1CompensateMock() + } + ) + + const step2 = createStep("RACE_step2", async (input: any) => { + step2InvokeMock() + return new StepResponse({ result: input }) + }) + + const subWorkflow = createWorkflow("RACE_sub-workflow-1", function () { + const status = step1() + return new WorkflowResponse(status) + }) + + createWorkflow(workflowId, function () { + const build = step0() + + const status = subWorkflow.runAsStep({} as any).config({ + async: true, + compensateAsync: true, + backgroundExecution: true, + retryIntervalAwaiting: 0.1, + }) + + const transformedResult = transform({ status }, (data) => { + transformMock() + return { + status: data.status, + } + }) + + step2(transformedResult) + return new WorkflowResponse(build) + }) + + void workflowOrcModule.subscribe({ + workflowId: workflowId, + transactionId, + subscriber: (event) => { + if (event.eventType === "onFinish") { + expect(step0InvokeMock).toHaveBeenCalledTimes(1) + expect(step0CompensateMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) + expect(step1CompensateMock).toHaveBeenCalledTimes(1) + expect(step2InvokeMock).toHaveBeenCalledTimes(0) + expect(transformMock).toHaveBeenCalledTimes(0) + done() + } + }, + }) + + workflowOrcModule + .run(workflowId, { transactionId }) + .then(({ result }) => { + expect(result).toBe("result from step 0") + }) + + failTrap(done) + }) + }) + }, +}) diff --git a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts index beb9fac9952ca..e56384b6fadb4 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflow-orchestrator.ts @@ -253,8 +253,6 @@ export class WorkflowOrchestratorService { hasFailed, } - console.log({ acknowledgement }) - if (hasFinished) { const { result, errors } = ret diff --git a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts index 4421f659e20f1..c4578270d6a7e 100644 --- a/packages/modules/workflow-engine-redis/src/services/workflows-module.ts +++ b/packages/modules/workflow-engine-redis/src/services/workflows-module.ts @@ -75,7 +75,9 @@ export class WorkflowsModuleService< await this.clearExpiredExecutions() this.clearTimeout_ = setInterval(async () => { - await this.clearExpiredExecutions() + try { + await this.clearExpiredExecutions() + } catch {} }, 1000 * 60 * 60) }, } diff --git a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts index 7886e7dbc5cde..3e1c3725d484f 100644 --- a/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts +++ b/packages/modules/workflow-engine-redis/src/utils/workflow-orchestrator-storage.ts @@ -513,7 +513,7 @@ export class RedisDistributedTransactionStorage }) const latestUpdatedFlowLastCompensatingStepIndex = !latestUpdatedFlow.steps - ? 1 // There is no other execution, so the current execution is the latest + ? -1 : Object.values( (latestUpdatedFlow.steps as Record) ?? {} ) @@ -534,11 +534,11 @@ export class RedisDistributedTransactionStorage currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex const compensateShouldBeSkipped = - (latestUpdatedFlowLastCompensatingStepIndex === - isLatestExecutionFinishedIndex || - currentFlowLastCompensatingStepIndex < - latestUpdatedFlowLastCompensatingStepIndex) && - currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex + currentFlowLastCompensatingStepIndex < + latestUpdatedFlowLastCompensatingStepIndex && + currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex && + latestUpdatedFlowLastCompensatingStepIndex !== + isLatestExecutionFinishedIndex if ( (data.flow.state !== TransactionState.COMPENSATING && @@ -546,9 +546,16 @@ export class RedisDistributedTransactionStorage (data.flow.state === TransactionState.COMPENSATING && compensateShouldBeSkipped) || (latestUpdatedFlow.state === TransactionState.COMPENSATING && - currentFlow.state !== latestUpdatedFlow.state) + ![TransactionState.REVERTED, TransactionState.FAILED].includes( + currentFlow.state + ) && + currentFlow.state !== latestUpdatedFlow.state) || + (latestUpdatedFlow.state === TransactionState.REVERTED && + currentFlow.state !== TransactionState.REVERTED) || + (latestUpdatedFlow.state === TransactionState.FAILED && + currentFlow.state !== TransactionState.FAILED) ) { - throw new SkipExecutionError("already finished by another execution") + throw new SkipExecutionError("Already finished by another execution") } } } From 51b38f42363ede5b552753ff02b2c78766f7134f Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 10 Mar 2025 13:48:42 -0300 Subject: [PATCH 8/9] changeset --- .changeset/fifty-pigs-sit.md | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 .changeset/fifty-pigs-sit.md diff --git a/.changeset/fifty-pigs-sit.md b/.changeset/fifty-pigs-sit.md new file mode 100644 index 0000000000000..12fef9e5d405f --- /dev/null +++ b/.changeset/fifty-pigs-sit.md @@ -0,0 +1,8 @@ +--- +"@medusajs/workflow-engine-inmemory": patch +"@medusajs/workflow-engine-redis": patch +"@medusajs/orchestration": patch +"@medusajs/workflows-sdk": patch +--- + +fix: workflow retry interval race condition From 4756d605b4734288c94785c8d0d2b1ca6e815ae9 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Mon, 10 Mar 2025 15:26:46 -0300 Subject: [PATCH 9/9] test --- .../modules/__tests__/workflow-engine/tests.ts | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/integration-tests/modules/__tests__/workflow-engine/tests.ts b/integration-tests/modules/__tests__/workflow-engine/tests.ts index 8855260d15131..12e3c872756ae 100644 --- a/integration-tests/modules/__tests__/workflow-engine/tests.ts +++ b/integration-tests/modules/__tests__/workflow-engine/tests.ts @@ -185,7 +185,7 @@ export const workflowEngineTestSuite = ( hasAsyncSteps: true, hasFailedSteps: false, hasSkippedSteps: false, - hasWaitingSteps: false, + hasWaitingSteps: true, hasRevertedSteps: false, }), context: expect.objectContaining({ @@ -236,6 +236,13 @@ export const workflowEngineTestSuite = ( workflow_id: "my-workflow-name", transaction_id: "trx_123", state: "done", + execution: expect.objectContaining({ + hasAsyncSteps: true, + hasFailedSteps: false, + hasSkippedSteps: false, + hasWaitingSteps: false, + hasRevertedSteps: false, + }), context: expect.objectContaining({ data: expect.objectContaining({ invoke: expect.objectContaining({