Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
carlos-r-l-rodrigues committed Mar 7, 2025
1 parent 8cda65a commit cfb27ed
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ import { isDefined } from "@medusajs/utils"
import { EventEmitter } from "events"
import { IDistributedTransactionStorage } from "./datastore/abstract-storage"
import { BaseInMemoryDistributedTransactionStorage } from "./datastore/base-in-memory-storage"
import { NonSerializableCheckPointError } from "./errors"
import { TransactionOrchestrator } from "./transaction-orchestrator"
import { TransactionStep, TransactionStepHandler } from "./transaction-step"
import {
TransactionFlow,
TransactionHandlerType,
TransactionState,
} from "./types"
import { NonSerializableCheckPointError } from "./errors"

/**
* @typedef TransactionMetadata
Expand Down Expand Up @@ -248,7 +248,6 @@ class DistributedTransaction extends EventEmitter {
return
}

await this.saveCheckpoint()
await DistributedTransaction.keyValueStore.scheduleRetry(
this,
step,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,6 @@ export class TransactionOrchestrator extends EventEmitter {
)
}

await transaction.saveCheckpoint()

this.emit(DistributedTransactionEvent.TIMEOUT, { transaction })

hasTimedOut = true
Expand Down Expand Up @@ -282,8 +280,6 @@ export class TransactionOrchestrator extends EventEmitter {
)
hasTimedOut = true

await transaction.saveCheckpoint()

this.emit(DistributedTransactionEvent.TIMEOUT, { transaction })
}
return hasTimedOut
Expand Down Expand Up @@ -458,7 +454,9 @@ export class TransactionOrchestrator extends EventEmitter {
transaction: DistributedTransactionType,
step: TransactionStep,
response: unknown
): Promise<void> {
): Promise<{
stopExecution: boolean
}> {
const hasStepTimedOut =
step.getStates().state === TransactionStepState.TIMEOUT

Expand Down Expand Up @@ -509,25 +507,35 @@ export class TransactionOrchestrator extends EventEmitter {
: DistributedTransactionEvent.STEP_SUCCESS
transaction.emit(eventName, { step, transaction })
}

return {
stopExecution: !shouldEmit,
}
}

private static async skipStep(
transaction: DistributedTransactionType,
step: TransactionStep
): Promise<void> {
): Promise<{
stopExecution: boolean
}> {
const hasStepTimedOut =
step.getStates().state === TransactionStepState.TIMEOUT

const flow = transaction.getFlow()
const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId)

if (!hasStepTimedOut) {
step.changeStatus(TransactionStepStatus.OK)
step.changeState(TransactionStepState.SKIPPED)
}

if (step.definition.async || options?.storeExecution) {
let shouldEmit = true
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
shouldEmit = false
} else {
throw error
}
}

const cleaningUp: Promise<unknown>[] = []
Expand All @@ -540,8 +548,14 @@ export class TransactionOrchestrator extends EventEmitter {

await promiseAll(cleaningUp)

const eventName = DistributedTransactionEvent.STEP_SKIPPED
transaction.emit(eventName, { step, transaction })
if (shouldEmit) {
const eventName = DistributedTransactionEvent.STEP_SKIPPED
transaction.emit(eventName, { step, transaction })
}

return {
stopExecution: !shouldEmit,
}
}

private static async setStepTimeout(
Expand Down Expand Up @@ -596,7 +610,15 @@ export class TransactionOrchestrator extends EventEmitter {
maxRetries: number = TransactionOrchestrator.DEFAULT_RETRIES,
isTimeout = false,
timeoutError?: TransactionStepTimeoutError | TransactionTimeoutError
): Promise<void> {
): Promise<{
stopExecution: boolean
}> {
if (SkipExecutionError.isSkipExecutionError(error)) {
return {
stopExecution: false,
}
}

step.failures++

if (isErrorLike(error)) {
Expand Down Expand Up @@ -682,6 +704,10 @@ export class TransactionOrchestrator extends EventEmitter {
: DistributedTransactionEvent.STEP_FAILURE
transaction.emit(eventName, { step, transaction })
}

return {
stopExecution: !shouldEmit,
}
}

private async executeNext(
Expand All @@ -690,8 +716,6 @@ export class TransactionOrchestrator extends EventEmitter {
let continueExecution = true

while (continueExecution) {
console.log("FLOW", transaction.modelId)

if (transaction.hasFinished()) {
return
}
Expand All @@ -710,27 +734,13 @@ export class TransactionOrchestrator extends EventEmitter {
continue
}

console.log(
"Remaining STEPS",
transaction.getFlow().modelId,
nextSteps.remaining,
nextSteps.next.map((step) => step.id),
nextSteps.current ?? "nothing"
)

if (nextSteps.remaining === 0) {
if (transaction.hasTimeout()) {
void transaction.clearTransactionTimeout()
}

// let shouldStop = false
// await transaction.saveCheckpoint()

console.log("FINISH", transaction.getFlow().transactionId)
this.emit(DistributedTransactionEvent.FINISH, { transaction })
// if (shouldStop) {
// return
// }
}

let hasSyncSteps = false
Expand Down Expand Up @@ -813,19 +823,18 @@ export class TransactionOrchestrator extends EventEmitter {
)
}

await TransactionOrchestrator.setStepFailure(
const ret = await TransactionOrchestrator.setStepFailure(
transaction,
step,
error,
endRetry ? 0 : step.definition.maxRetries
)

if (isAsync) {
await transaction.scheduleRetry(
step,
step.definition.retryInterval ?? 0
)
if (isAsync && !ret.stopExecution) {
await transaction.scheduleRetry(step, 0)
}

return ret
}

const traceData = {
Expand All @@ -851,16 +860,6 @@ export class TransactionOrchestrator extends EventEmitter {
] as Parameters<TransactionStepHandler>

if (!isAsync) {
// try {
// await transaction.saveCheckpoint()
// } catch (error) {
// if (SkipExecutionError.isSkipExecutionError(error)) {
// await transaction.clearStepTimeout(step)
// continueExecution = false
// continue
// }
// }

hasSyncSteps = true

const stepHandler = async () => {
Expand Down Expand Up @@ -898,12 +897,6 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
if (SkipExecutionError.isSkipExecutionError(error)) {
await transaction.clearStepTimeout(step)
continueExecution = false
return
}

const response = error?.getStepResponse?.()

if (this.hasExpired({ transaction, step }, Date.now())) {
Expand All @@ -921,10 +914,13 @@ export class TransactionOrchestrator extends EventEmitter {
endRetry: true,
response,
})

return
}

await setStepFailure(error, { response })
await setStepFailure(error, {
response,
})
})
)
} else {
Expand Down Expand Up @@ -979,20 +975,9 @@ export class TransactionOrchestrator extends EventEmitter {
}

// check nested flow
await transaction.scheduleRetry(
step,
step.definition.retryInterval ?? 0
)
await transaction.scheduleRetry(step, 0)
})
.catch(async (error) => {
console.log("ON Failure", error)

if (SkipExecutionError.isSkipExecutionError(error)) {
await transaction.clearStepTimeout(step)
continueExecution = false
return
}

const response = error?.getStepResponse?.()

if (
Expand All @@ -1002,20 +987,29 @@ export class TransactionOrchestrator extends EventEmitter {
endRetry: true,
response,
})

return
}

await setStepFailure(error, { response })

console.log("ON Failure")
await setStepFailure(error, {
response,
})
})
})
)
}
}

if (hasSyncSteps && options?.storeExecution) {
await transaction.saveCheckpoint()
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
break
} else {
throw error
}
}
}

await promiseAll(execution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,6 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
)
},
async (transaction, stepContext) => {
console.log(
"BEFORE SUB WORKFLOW COMPENSATION",
stepContext.idempotencyKey
)

if (!transaction) {
return
}
Expand All @@ -227,10 +222,6 @@ export function createWorkflow<TData, TResult, THooks extends any[]>(
parentStepIdempotencyKey: stepContext.idempotencyKey,
},
})

console.log("RESPONSE FROM SUB WORKFLOW COMPENSATION", {
transaction,
})
}
)(input) as ReturnType<StepFunction<TData, TResult>>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})
})

it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => {
it("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => {
const transactionId = "transaction_id"
const workflowId = "RACE_workflow-1"

Expand All @@ -128,7 +128,6 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
const step0 = createStep(
"RACE_step0",
async (_) => {
console.log("step0")
step0InvokeMock()
return new StepResponse("result from step 0")
},
Expand All @@ -143,7 +142,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
async (_) => {
console.log("step1")
step1InvokeMock()
await setTimeout(2000)
await setTimeout(500)
throw new Error("error from step 1")
},
() => {
Expand All @@ -153,7 +152,6 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
)

const step2 = createStep("RACE_step2", async (input: any) => {
console.log("step2")
step2InvokeMock()
return new StepResponse({ result: input })
})
Expand All @@ -170,7 +168,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
async: true,
compensateAsync: true,
backgroundExecution: true,
// retryIntervalAwaiting: 1,
retryIntervalAwaiting: 0.1,
})

const transformedResult = transform({ status }, (data) => {
Expand All @@ -191,7 +189,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
if (event.eventType === "onFinish") {
expect(step0InvokeMock).toHaveBeenCalledTimes(1)
expect(step0CompensateMock).toHaveBeenCalledTimes(1)
expect(step1InvokeMock).toHaveBeenCalledTimes(1)
expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(2)
expect(step1CompensateMock).toHaveBeenCalledTimes(1)
expect(step2InvokeMock).toHaveBeenCalledTimes(0)
expect(transformMock).toHaveBeenCalledTimes(0)
Expand Down

0 comments on commit cfb27ed

Please sign in to comment.