From cfb27ed14523e39f4a87ae5ae19fdb19cffe27c2 Mon Sep 17 00:00:00 2001 From: "Carlos R. L. Rodrigues" Date: Fri, 7 Mar 2025 16:41:57 -0300 Subject: [PATCH] cleanup --- .../transaction/distributed-transaction.ts | 3 +- .../transaction/transaction-orchestrator.ts | 130 +++++++++--------- .../src/utils/composer/create-workflow.ts | 9 -- .../integration-tests/__tests__/index.spec.ts | 10 +- 4 files changed, 67 insertions(+), 85 deletions(-) diff --git a/packages/core/orchestration/src/transaction/distributed-transaction.ts b/packages/core/orchestration/src/transaction/distributed-transaction.ts index afa5d280eb2f7..e2256860f1320 100644 --- a/packages/core/orchestration/src/transaction/distributed-transaction.ts +++ b/packages/core/orchestration/src/transaction/distributed-transaction.ts @@ -2,6 +2,7 @@ import { isDefined } from "@medusajs/utils" import { EventEmitter } from "events" import { IDistributedTransactionStorage } from "./datastore/abstract-storage" import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage" +import { NonSerializableCheckPointError } from "./errors" import { TransactionOrchestrator } from "./transaction-orchestrator" import { TransactionStep, TransactionStepHandler } from "./transaction-step" import { @@ -9,7 +10,6 @@ import { TransactionHandlerType, TransactionState, } from "./types" -import { NonSerializableCheckPointError } from "./errors" /** * @typedef TransactionMetadata @@ -248,7 +248,6 @@ class DistributedTransaction extends EventEmitter { return } - await this.saveCheckpoint() await DistributedTransaction.keyValueStore.scheduleRetry( this, step, diff --git a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts index 4e1787a32f3ba..9376969c937f0 100644 --- a/packages/core/orchestration/src/transaction/transaction-orchestrator.ts +++ b/packages/core/orchestration/src/transaction/transaction-orchestrator.ts @@ -253,8 +253,6 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await transaction.saveCheckpoint() - this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) hasTimedOut = true @@ -282,8 +280,6 @@ export class TransactionOrchestrator extends EventEmitter { ) hasTimedOut = true - await transaction.saveCheckpoint() - this.emit(DistributedTransactionEvent.TIMEOUT, { transaction }) } return hasTimedOut @@ -458,7 +454,9 @@ export class TransactionOrchestrator extends EventEmitter { transaction: DistributedTransactionType, step: TransactionStep, response: unknown - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT @@ -509,25 +507,35 @@ export class TransactionOrchestrator extends EventEmitter { : DistributedTransactionEvent.STEP_SUCCESS transaction.emit(eventName, { step, transaction }) } + + return { + stopExecution: !shouldEmit, + } } private static async skipStep( transaction: DistributedTransactionType, step: TransactionStep - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { const hasStepTimedOut = step.getStates().state === TransactionStepState.TIMEOUT - const flow = transaction.getFlow() - const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId) - if (!hasStepTimedOut) { step.changeStatus(TransactionStepStatus.OK) step.changeState(TransactionStepState.SKIPPED) } - if (step.definition.async || options?.storeExecution) { + let shouldEmit = true + try { await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + shouldEmit = false + } else { + throw error + } } const cleaningUp: Promise[] = [] @@ -540,8 +548,14 @@ export class TransactionOrchestrator extends EventEmitter { await promiseAll(cleaningUp) - const eventName = DistributedTransactionEvent.STEP_SKIPPED - transaction.emit(eventName, { step, transaction }) + if (shouldEmit) { + const eventName = DistributedTransactionEvent.STEP_SKIPPED + transaction.emit(eventName, { step, transaction }) + } + + return { + stopExecution: !shouldEmit, + } } private static async setStepTimeout( @@ -596,7 +610,15 @@ export class TransactionOrchestrator extends EventEmitter { maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES, isTimeout = false, timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError - ): Promise { + ): Promise<{ + stopExecution: boolean + }> { + if (SkipExecutionError.isSkipExecutionError(error)) { + return { + stopExecution: false, + } + } + step.failures++ if (isErrorLike(error)) { @@ -682,6 +704,10 @@ export class TransactionOrchestrator extends EventEmitter { : DistributedTransactionEvent.STEP_FAILURE transaction.emit(eventName, { step, transaction }) } + + return { + stopExecution: !shouldEmit, + } } private async executeNext( @@ -690,8 +716,6 @@ export class TransactionOrchestrator extends EventEmitter { let continueExecution = true while (continueExecution) { - console.log("FLOW", transaction.modelId) - if (transaction.hasFinished()) { return } @@ -710,27 +734,13 @@ export class TransactionOrchestrator extends EventEmitter { continue } - console.log( - "Remaining STEPS", - transaction.getFlow().modelId, - nextSteps.remaining, - nextSteps.next.map((step) => step.id), - nextSteps.current ?? "nothing" - ) - if (nextSteps.remaining === 0) { if (transaction.hasTimeout()) { void transaction.clearTransactionTimeout() } - // let shouldStop = false - // await transaction.saveCheckpoint() - console.log("FINISH", transaction.getFlow().transactionId) this.emit(DistributedTransactionEvent.FINISH, { transaction }) - // if (shouldStop) { - // return - // } } let hasSyncSteps = false @@ -813,19 +823,18 @@ export class TransactionOrchestrator extends EventEmitter { ) } - await TransactionOrchestrator.setStepFailure( + const ret = await TransactionOrchestrator.setStepFailure( transaction, step, error, endRetry ? 0 : step.definition.maxRetries ) - if (isAsync) { - await transaction.scheduleRetry( - step, - step.definition.retryInterval ?? 0 - ) + if (isAsync && !ret.stopExecution) { + await transaction.scheduleRetry(step, 0) } + + return ret } const traceData = { @@ -851,16 +860,6 @@ export class TransactionOrchestrator extends EventEmitter { ] as Parameters if (!isAsync) { - // try { - // await transaction.saveCheckpoint() - // } catch (error) { - // if (SkipExecutionError.isSkipExecutionError(error)) { - // await transaction.clearStepTimeout(step) - // continueExecution = false - // continue - // } - // } - hasSyncSteps = true const stepHandler = async () => { @@ -898,12 +897,6 @@ export class TransactionOrchestrator extends EventEmitter { ) }) .catch(async (error) => { - if (SkipExecutionError.isSkipExecutionError(error)) { - await transaction.clearStepTimeout(step) - continueExecution = false - return - } - const response = error?.getStepResponse?.() if (this.hasExpired({ transaction, step }, Date.now())) { @@ -921,10 +914,13 @@ export class TransactionOrchestrator extends EventEmitter { endRetry: true, response, }) + return } - await setStepFailure(error, { response }) + await setStepFailure(error, { + response, + }) }) ) } else { @@ -979,20 +975,9 @@ export class TransactionOrchestrator extends EventEmitter { } // check nested flow - await transaction.scheduleRetry( - step, - step.definition.retryInterval ?? 0 - ) + await transaction.scheduleRetry(step, 0) }) .catch(async (error) => { - console.log("ON Failure", error) - - if (SkipExecutionError.isSkipExecutionError(error)) { - await transaction.clearStepTimeout(step) - continueExecution = false - return - } - const response = error?.getStepResponse?.() if ( @@ -1002,12 +987,13 @@ export class TransactionOrchestrator extends EventEmitter { endRetry: true, response, }) + return } - await setStepFailure(error, { response }) - - console.log("ON Failure") + await setStepFailure(error, { + response, + }) }) }) ) @@ -1015,7 +1001,15 @@ export class TransactionOrchestrator extends EventEmitter { } if (hasSyncSteps && options?.storeExecution) { - await transaction.saveCheckpoint() + try { + await transaction.saveCheckpoint() + } catch (error) { + if (SkipExecutionError.isSkipExecutionError(error)) { + break + } else { + throw error + } + } } await promiseAll(execution) diff --git a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts index 494d3abfbba25..18d9eecf8d748 100644 --- a/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts +++ b/packages/core/workflows-sdk/src/utils/composer/create-workflow.ts @@ -207,11 +207,6 @@ export function createWorkflow( ) }, async (transaction, stepContext) => { - console.log( - "BEFORE SUB WORKFLOW COMPENSATION", - stepContext.idempotencyKey - ) - if (!transaction) { return } @@ -227,10 +222,6 @@ export function createWorkflow( parentStepIdempotencyKey: stepContext.idempotencyKey, }, }) - - console.log("RESPONSE FROM SUB WORKFLOW COMPENSATION", { - transaction, - }) } )(input) as ReturnType> diff --git a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts index cf4719ab0f66a..d3b192982a418 100644 --- a/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts +++ b/packages/modules/workflow-engine-inmemory/integration-tests/__tests__/index.spec.ts @@ -114,7 +114,7 @@ moduleIntegrationTestRunner({ }) }) - it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { + it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => { const transactionId = "transaction_id" const workflowId = "RACE_workflow-1" @@ -128,7 +128,6 @@ moduleIntegrationTestRunner({ const step0 = createStep( "RACE_step0", async (_) => { - console.log("step0") step0InvokeMock() return new StepResponse("result from step 0") }, @@ -143,7 +142,7 @@ moduleIntegrationTestRunner({ async (_) => { console.log("step1") step1InvokeMock() - await setTimeout(2000) + await setTimeout(500) throw new Error("error from step 1") }, () => { @@ -153,7 +152,6 @@ moduleIntegrationTestRunner({ ) const step2 = createStep("RACE_step2", async (input: any) => { - console.log("step2") step2InvokeMock() return new StepResponse({ result: input }) }) @@ -170,7 +168,7 @@ moduleIntegrationTestRunner({ async: true, compensateAsync: true, backgroundExecution: true, - // retryIntervalAwaiting: 1, + retryIntervalAwaiting: 0.1, }) const transformedResult = transform({ status }, (data) => { @@ -191,7 +189,7 @@ moduleIntegrationTestRunner({ if (event.eventType === "onFinish") { expect(step0InvokeMock).toHaveBeenCalledTimes(1) expect(step0CompensateMock).toHaveBeenCalledTimes(1) - expect(step1InvokeMock).toHaveBeenCalledTimes(1) + expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2) expect(step1CompensateMock).toHaveBeenCalledTimes(1) expect(step2InvokeMock).toHaveBeenCalledTimes(0) expect(transformMock).toHaveBeenCalledTimes(0)