Skip to content

Commit

Permalink
fix checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
adrien2p committed Mar 7, 2025
1 parent 3cd9ae9 commit 8cda65a
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -472,9 +472,6 @@ export class TransactionOrchestrator extends EventEmitter {
)
}

const flow = transaction.getFlow()
const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId)

if (!hasStepTimedOut) {
step.changeStatus(TransactionStepStatus.OK)
}
Expand All @@ -485,8 +482,15 @@ export class TransactionOrchestrator extends EventEmitter {
step.changeState(TransactionStepState.DONE)
}

if (step.definition.async || options?.storeExecution) {
let shouldEmit = true
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
shouldEmit = false
} else {
throw error
}
}

const cleaningUp: Promise<unknown>[] = []
Expand All @@ -499,10 +503,12 @@ export class TransactionOrchestrator extends EventEmitter {

await promiseAll(cleaningUp)

const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS
: DistributedTransactionEvent.STEP_SUCCESS
transaction.emit(eventName, { step, transaction })
if (shouldEmit) {
const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_SUCCESS
: DistributedTransactionEvent.STEP_SUCCESS
transaction.emit(eventName, { step, transaction })
}
}

private static async skipStep(
Expand Down Expand Up @@ -605,7 +611,6 @@ export class TransactionOrchestrator extends EventEmitter {
}

const flow = transaction.getFlow()
const options = TransactionOrchestrator.getWorkflowOptions(flow.modelId)

const cleaningUp: Promise<unknown>[] = []

Expand Down Expand Up @@ -654,8 +659,15 @@ export class TransactionOrchestrator extends EventEmitter {
}
}

if (step.definition.async || options?.storeExecution) {
let shouldEmit = true
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
shouldEmit = false
} else {
throw error
}
}

if (step.hasRetryScheduled()) {
Expand All @@ -664,10 +676,12 @@ export class TransactionOrchestrator extends EventEmitter {

await promiseAll(cleaningUp)

const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE
: DistributedTransactionEvent.STEP_FAILURE
transaction.emit(eventName, { step, transaction })
if (shouldEmit) {
const eventName = step.isCompensating()
? DistributedTransactionEvent.COMPENSATE_STEP_FAILURE
: DistributedTransactionEvent.STEP_FAILURE
transaction.emit(eventName, { step, transaction })
}
}

private async executeNext(
Expand Down Expand Up @@ -696,17 +710,27 @@ export class TransactionOrchestrator extends EventEmitter {
continue
}

console.log("Remaining STEPS", nextSteps.remaining)
console.log(
"Remaining STEPS",
transaction.getFlow().modelId,
nextSteps.remaining,
nextSteps.next.map((step) => step.id),
nextSteps.current ?? "nothing"
)

if (nextSteps.remaining === 0) {
if (transaction.hasTimeout()) {
void transaction.clearTransactionTimeout()
}

await transaction.saveCheckpoint()
// let shouldStop = false
// await transaction.saveCheckpoint()

console.log("FINISH", transaction.getFlow().transactionId)
this.emit(DistributedTransactionEvent.FINISH, { transaction })
// if (shouldStop) {
// return
// }
}

let hasSyncSteps = false
Expand Down Expand Up @@ -827,14 +851,15 @@ export class TransactionOrchestrator extends EventEmitter {
] as Parameters<TransactionStepHandler>

if (!isAsync) {
try {
await transaction.saveCheckpoint()
} catch (error) {
if (SkipExecutionError.isSkipExecutionError(error)) {
continueExecution = false
continue
}
}
// try {
// await transaction.saveCheckpoint()
// } catch (error) {
// if (SkipExecutionError.isSkipExecutionError(error)) {
// await transaction.clearStepTimeout(step)
// continueExecution = false
// continue
// }
// }

hasSyncSteps = true

Expand Down Expand Up @@ -873,8 +898,8 @@ export class TransactionOrchestrator extends EventEmitter {
)
})
.catch(async (error) => {
console.log("ON Failure SYNC", error)
if (SkipExecutionError.isSkipExecutionError(error)) {
await transaction.clearStepTimeout(step)
continueExecution = false
return
}
Expand Down Expand Up @@ -963,6 +988,7 @@ export class TransactionOrchestrator extends EventEmitter {
console.log("ON Failure", error)

if (SkipExecutionError.isSkipExecutionError(error)) {
await transaction.clearStepTimeout(step)
continueExecution = false
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({

it.only("should prevent race continuation of the workflow compensation during retryIntervalAwaiting in background execution", (done) => {
const transactionId = "transaction_id"
const workflowId = "RACE_workflow-1"

const step0InvokeMock = jest.fn()
const step0CompensateMock = jest.fn()
Expand All @@ -132,6 +133,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
return new StepResponse("result from step 0")
},
() => {
console.log("step0 compensate")
step0CompensateMock()
}
)
Expand All @@ -145,6 +147,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
throw new Error("error from step 1")
},
() => {
console.log("step1 compensate")
step1CompensateMock()
}
)
Expand All @@ -160,14 +163,14 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
return new WorkflowResponse(status)
})

createWorkflow("RACE_workflow-1", function () {
createWorkflow(workflowId, function () {
const build = step0()

const status = subWorkflow.runAsStep({} as any).config({
async: true,
compensateAsync: true,
backgroundExecution: true,
retryIntervalAwaiting: 1,
// retryIntervalAwaiting: 1,
})

const transformedResult = transform({ status }, (data) => {
Expand All @@ -182,14 +185,14 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})

void workflowOrcModule.subscribe({
workflowId: "workflow-1",
workflowId: workflowId,
transactionId,
subscriber: (event) => {
if (event.eventType === "onFinish") {
expect(step0InvokeMock).toHaveBeenCalledTimes(1)
expect(step0CompensateMock).toHaveBeenCalledTimes(1)
expect(step1InvokeMock.mock.calls.length).toBeGreaterThan(1)
expect(step1CompensateMock.mock.calls.length).toBeGreaterThan(1)
expect(step1InvokeMock).toHaveBeenCalledTimes(1)
expect(step1CompensateMock).toHaveBeenCalledTimes(1)
expect(step2InvokeMock).toHaveBeenCalledTimes(0)
expect(transformMock).toHaveBeenCalledTimes(0)
done()
Expand All @@ -198,7 +201,7 @@ moduleIntegrationTestRunner<IWorkflowEngineService>({
})

workflowOrcModule
.run("workflow-1", { transactionId })
.run(workflowId, { transactionId })
.then(({ result }) => {
expect(result).toBe("result from step 0")
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,12 +231,11 @@ export class InMemoryDistributedTransactionStorage
currentFlowLastInvokingStepIndex !== isLatestExecutionFinishedIndex

const compensateShouldBeSkipped =
latestUpdatedFlowLastCompensatingStepIndex ===
(latestUpdatedFlowLastCompensatingStepIndex ===
isLatestExecutionFinishedIndex ||
(currentFlowLastCompensatingStepIndex <
latestUpdatedFlowLastCompensatingStepIndex &&
latestUpdatedFlowLastCompensatingStepIndex !==
isLatestExecutionFinishedIndex)
currentFlowLastCompensatingStepIndex <
latestUpdatedFlowLastCompensatingStepIndex) &&
currentFlowLastCompensatingStepIndex !== isLatestExecutionFinishedIndex

if (
(data.flow.state !== TransactionState.COMPENSATING &&
Expand All @@ -246,6 +245,14 @@ export class InMemoryDistributedTransactionStorage
(latestUpdatedFlow.state === TransactionState.COMPENSATING &&
currentFlow.state !== latestUpdatedFlow.state)
) {
console.log("skipping execution", {
currentState: data.flow.state,
latestUpdatedState: latestUpdatedFlow.state,
latestUpdatedFlowLastCompensatingStepIndex,
currentFlowLastCompensatingStepIndex,
currentFlowLastInvokingStepIndex,
latestUpdatedFlowLastInvokingStepIndex,
})
/**
* If the latest execution is ahead of the current execution in terms of completion then we
* should skip to prevent multiple completion/execution of the same step. The same goes for
Expand Down

0 comments on commit 8cda65a

Please sign in to comment.