Skip to content

Conversation

ephraimbuddy
Copy link
Contributor

@ephraimbuddy ephraimbuddy commented Oct 15, 2025

DAG test should be able to run without serialized dag. This PR is to ensure serialized dag is not required when running dag test.

I used the triggering_user_name to exclude the check for serdag but I wonder if we should have a specific name to exclude serdag in the dagrun creation other than triggering_user_name. Please let me know

Closes: #56657

DAG test should be able to run without serialized dag. This PR
is to ensure serialized dag is not required when running dag test.

I used the triggering_user_name to exclude the check for serdag but
I wonder if we should have a specific name to exclude serdag in the
dagrun creation other than triggering_user_name. Please let me know
@ephraimbuddy ephraimbuddy added this to the Airflow 3.1.1 milestone Oct 15, 2025
@ephraimbuddy ephraimbuddy added the backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch label Oct 15, 2025
Copy link
Contributor

@tatiana tatiana left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ephraimbuddy Thanks a lot for working on this so quickly.
Would it be worth to update the PR description to mention it closes #56657?

# create the associated task instances
# state is None at the moment of creation
run.verify_integrity(session=session, dag_version_id=dag_version.id)
run.verify_integrity(session=session, dag_version_id=dag_version.id if dag_version else None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to run verify integrity at all if dag version is none? (I dont have the context loaded in my head right now)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We need to create the Task Instances

)
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
if not dag_version:
if not dag_version and triggering_user_name != "dag_test":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this works, but i don't line this method of detection.

First question: why do we need a orm dag object anyway? (I think this is since that is where the scheduling logic is?)

Second question: since we know we aren't going to store anything here in the db: do we need to call this function, or would a specialised one that just creates the orm objects in memory fir the purposes of dag.test be better suited?

Copy link
Contributor Author

@ephraimbuddy ephraimbuddy Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First question: why do we need a orm dag object anyway? (I think this is since that is where the scheduling logic is?)

The ORM object here is used to ensure the dag version is linked to the created dag run and task instances. Since we don't need that in test, we can use the serdag in the test funtion itself:

scheduler_dag = SerializedDAG.deserialize_dag(SerializedDAG.serialize_dag(self)) # type: ignore[arg-type]
# Preserve callback functions from original Dag since they're lost during serialization
# and yes it is a hack for now! It is a tradeoff for code simplicity.
# Without it, we need "Scheduler Dag" (Serialized dag) for the scheduler bits
# -- dep check, scheduling tis
# and need real dag to get and run callbacks without having to load the dag model
# Scheduler DAG shouldn't have these attributes, but assigning them
# here is an easy hack to get this test() thing working.
scheduler_dag.on_success_callback = self.on_success_callback # type: ignore[attr-defined]
scheduler_dag.on_failure_callback = self.on_failure_callback # type: ignore[attr-defined]

Second question: since we know we aren't going to store anything here in the db: do we need to call this function, or would a specialised one that just creates the orm objects in memory fir the purposes of dag.test be better suited?

I think the serdag used in the dag.test method is enough. I can prevent calling the dag_version if triggering_user_name is dag_test. The dag_version is only used to ensure dagrun and created TI is linked to a version but tests can skip it

@ephraimbuddy
Copy link
Contributor Author

@ephraimbuddy Thanks a lot for working on this so quickly. Would it be worth to update the PR description to mention it closes #56657?

Yes. Done. Thanks

@ephraimbuddy
Copy link
Contributor Author

I noticed the task is failing when executing:

2025-10-15T12:54:56.204021Z [info     ] [DAG TEST] starting task_id=test map_index=-1 [airflow.sdk.definitions.dag] loc=dag.py:1336
2025-10-15T12:54:56.204589Z [info     ] [DAG TEST] running task <TaskInstance: my_dag.test manual__2025-10-15T12:54:56.173999+00:00 [scheduled]> [airflow.sdk.definitions.dag] loc=dag.py:1339
2025-10-15T12:54:56.210513Z [error    ] [DAG TEST] Error running task <TaskInstance: my_dag.test manual__2025-10-15T12:54:56.173999+00:00 [queued]> [airflow.sdk.definitions.dag] loc=dag.py:1386
Traceback (most recent call last):
  File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/dag.py", line 1349, in _run_task
    task_sdk_ti = TaskInstanceSDK(
  File "/usr/python/lib/python3.10/site-packages/pydantic/main.py", line 253, in __init__
    validated_self = self.__pydantic_validator__.validate_python(data, self_instance=self)
pydantic_core._pydantic_core.ValidationError: 1 validation error for TaskInstance
dag_version_id
  UUID input should be a string, bytes or UUID object [type=uuid_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/uuid_type
2025-10-15T12:54:56.225062Z [info     ] [DAG TEST] end task task_id=test map_index=-1 [airflow.sdk.definitions.dag] loc=dag.py:1393
2025-10-15T12:54:56.228413Z [info     ] Marking run <DagRun my_dag @ 2025-10-14 00:00:00+00:00: manual__2025-10-15T12:54:56.173999+00:00, state:running, queued_at: None. run_type: manual> failed [airflow.models.dagrun.DagRun] loc=dagrun.py:1177
Dag run  in failure state
Dag information:my_dag Run id: manual__2025-10-15T12:54:56.173999+00:00 Run type: manual
Failed with message: task_failure
2025-10-15T12:54:56.236897Z [info     ] DagRun Finished: dag_id=my_dag, logical_date=2025-10-14 00:00:00+00:00, run_id=manual__2025-10-15T12:54:56.173999+00:00, run_start_date=2025-10-14 00:00:00+00:00, run_end_date=2025-10-15 12:54:56.228541+00:00, run_duration=132896.228541, state=failed, run_type=manual, data_interval_start=2025-10-14 00:00:00+00:00, data_interval_end=2025-10-14 00:00:00+00:00, [airflow.models.dagrun.DagRun] loc=dagrun.py:1280
DagRun failed

@ephraimbuddy ephraimbuddy requested a review from ashb October 15, 2025 16:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:CLI area:serialization backport-to-v3-1-test Mark PR with this label to backport to v3-1-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow dag.test() raises AirflowException: Cannot create DagRun for DAG because the dag is not serialized

3 participants