Prevent Concurrent TaskInstance Tries in Scheduler HA #59234
+1
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fix Concurrency Issue: Prevent Concurrent TaskInstance Tries in Scheduler HA
Related issue: #57618
This PR aims to fix a sporadic but critical race condition in the HA scheduler setup that could lead to concurrent execution of the same Task Instance (TI) with sequential try numbers (e.g., Try #1 and Try #2 running simultaneously).
🐞 Problem Description
In a multi-scheduler environment, if two schedulers detected that a TI was ready to run (state$\mathbf{SCHEDULED}$ state.
None) at nearly the exact same moment, both would attempt to push the task into theThe race failure occurred due to the following two factors in the
DagRun.schedule_tis()logic:WHEREClause: TheUPDATEquery only usedTI.id.in_(id_chunk), lacking a final state check.try_numberLogic: Thetry_numberwas advanced (TI.try_number + 1) if the state was notUP_FOR_RESCHEDULE.When Scheduler B lost the race to Scheduler A:
WHEREclause matched the ID, and thescheduled_dttmfield changed, the update was successful (rowcount=1).CASElogic saw the newSCHEDULEDstate and incorrectly determined it should be advanced totry_number=2, thus corrupting the record and enabling a concurrent run.✅ Solution: Enforce Optimistic Concurrency Control
This PR enforces Optimistic Concurrency Control (OCC) by adding a restrictive
WHEREclause to the atomic update operation.The added condition is:
.where(TI.state.in_(SCHEDULEABLE_STATES))How this fixes the race:
WHEREclause holds true, and the update succeeds (rowcount=1).SCHEDULEABLE_STATES).WHEREclause fails to find a match, the query returnsThis ensures that the$\mathbf{None} \rightarrow \mathbf{SCHEDULED}$ transition, resolving the concurrent task execution bug.
try_numbercan no longer be corrupted during the