Skip to content

Commit afd53fd

Browse files
authored
Fetch asset event source dag run (#59090)
1 parent 38cf214 commit afd53fd

File tree

16 files changed

+476
-142
lines changed

16 files changed

+476
-142
lines changed

airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py

Lines changed: 76 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import logging
2121
from typing import Annotated
2222

23-
from fastapi import APIRouter, HTTPException, Query, status
23+
from cadwyn import VersionedAPIRouter
24+
from fastapi import HTTPException, Query, status
2425
from sqlalchemy import func, select
26+
from sqlalchemy.exc import NoResultFound
2527

2628
from airflow.api.common.trigger_dag import trigger_dag
2729
from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run
@@ -36,19 +38,54 @@
3638
from airflow.utils.state import DagRunState
3739
from airflow.utils.types import DagRunTriggeredByType
3840

39-
router = APIRouter()
40-
41+
router = VersionedAPIRouter()
4142

4243
log = logging.getLogger(__name__)
4344

4445

46+
@router.only_exists_in_older_versions
47+
@router.get("/{dag_id}/previous")
48+
def get_previous_dagrun_compat(
49+
dag_id: str,
50+
logical_date: UtcDateTime,
51+
session: SessionDep,
52+
state: DagRunState | None = None,
53+
):
54+
"""
55+
Redirect old previous dag run request to the new endpoint.
56+
57+
This endpoint must be put before ``get_dag_run`` so not to be shadowed.
58+
Newer client versions would not see this endpoint, and be routed to
59+
``get_dag_run`` below instead.
60+
"""
61+
return get_previous_dagrun(dag_id, logical_date, session, state)
62+
63+
64+
@router.get(
65+
"/{dag_id}/{run_id}",
66+
responses={status.HTTP_404_NOT_FOUND: {"description": "Dag run not found"}},
67+
)
68+
def get_dag_run(dag_id: str, run_id: str, session: SessionDep) -> DagRun:
69+
"""Get detail of a Dag run."""
70+
dr = session.scalar(select(DagRunModel).where(DagRunModel.dag_id == dag_id, DagRunModel.run_id == run_id))
71+
if dr is None:
72+
raise HTTPException(
73+
status.HTTP_404_NOT_FOUND,
74+
detail={
75+
"reason": "not_found",
76+
"message": f"Dag run with dag_id '{dag_id}' and run_id '{run_id}' was not found",
77+
},
78+
)
79+
return DagRun.model_validate(dr)
80+
81+
4582
@router.post(
4683
"/{dag_id}/{run_id}",
4784
status_code=status.HTTP_204_NO_CONTENT,
4885
responses={
49-
status.HTTP_400_BAD_REQUEST: {"description": "DAG has import errors and cannot be triggered"},
50-
status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the given dag_id"},
51-
status.HTTP_409_CONFLICT: {"description": "DAG Run already exists for the given dag_id"},
86+
status.HTTP_400_BAD_REQUEST: {"description": "Dag has import errors and cannot be triggered"},
87+
status.HTTP_404_NOT_FOUND: {"description": "Dag not found for the given dag_id"},
88+
status.HTTP_409_CONFLICT: {"description": "Dag run already exists for the given dag_id"},
5289
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload"},
5390
},
5491
)
@@ -57,21 +94,21 @@ def trigger_dag_run(
5794
run_id: str,
5895
payload: TriggerDAGRunPayload,
5996
session: SessionDep,
60-
):
61-
"""Trigger a DAG Run."""
97+
) -> None:
98+
"""Trigger a Dag run."""
6299
dm = session.scalar(select(DagModel).where(~DagModel.is_stale, DagModel.dag_id == dag_id).limit(1))
63100
if not dm:
64101
raise HTTPException(
65102
status.HTTP_404_NOT_FOUND,
66-
detail={"reason": "not_found", "message": f"DAG with dag_id: '{dag_id}' not found"},
103+
detail={"reason": "not_found", "message": f"Dag with dag_id: '{dag_id}' not found"},
67104
)
68105

69106
if dm.has_import_errors:
70107
raise HTTPException(
71108
status.HTTP_400_BAD_REQUEST,
72109
detail={
73110
"reason": "import_errors",
74-
"message": f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
111+
"message": f"Dag with dag_id '{dag_id}' has import errors and cannot be triggered",
75112
},
76113
)
77114

@@ -90,7 +127,7 @@ def trigger_dag_run(
90127
status.HTTP_409_CONFLICT,
91128
detail={
92129
"reason": "already_exists",
93-
"message": f"A DAG Run already exists for DAG {dag_id} with run id {run_id}",
130+
"message": f"A run already exists for Dag '{dag_id}' with run_id '{run_id}'",
94131
},
95132
)
96133

@@ -99,8 +136,8 @@ def trigger_dag_run(
99136
"/{dag_id}/{run_id}/clear",
100137
status_code=status.HTTP_204_NO_CONTENT,
101138
responses={
102-
status.HTTP_400_BAD_REQUEST: {"description": "DAG has import errors and cannot be triggered"},
103-
status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the given dag_id"},
139+
status.HTTP_400_BAD_REQUEST: {"description": "Dag has import errors and cannot be triggered"},
140+
status.HTTP_404_NOT_FOUND: {"description": "Dag not found for the given dag_id"},
104141
HTTP_422_UNPROCESSABLE_CONTENT: {"description": "Invalid payload"},
105142
},
106143
)
@@ -109,21 +146,21 @@ def clear_dag_run(
109146
run_id: str,
110147
session: SessionDep,
111148
dag_bag: DagBagDep,
112-
):
113-
"""Clear a DAG Run."""
149+
) -> None:
150+
"""Clear a Dag run."""
114151
dm = session.scalar(select(DagModel).where(~DagModel.is_stale, DagModel.dag_id == dag_id).limit(1))
115152
if not dm:
116153
raise HTTPException(
117154
status.HTTP_404_NOT_FOUND,
118-
detail={"reason": "not_found", "message": f"DAG with dag_id: '{dag_id}' not found"},
155+
detail={"reason": "not_found", "message": f"Dag with dag_id: '{dag_id}' not found"},
119156
)
120157

121158
if dm.has_import_errors:
122159
raise HTTPException(
123160
status.HTTP_400_BAD_REQUEST,
124161
detail={
125162
"reason": "import_errors",
126-
"message": f"DAG with dag_id: '{dag_id}' has import errors and cannot be triggered",
163+
"message": f"Dag with dag_id '{dag_id}' has import errors and cannot be triggered",
127164
},
128165
)
129166

@@ -133,7 +170,7 @@ def clear_dag_run(
133170
if dag_run is None:
134171
raise HTTPException(
135172
status.HTTP_404_NOT_FOUND,
136-
detail={"reason": "not_found", "message": f"DAG run with run_id: '{run_id}' not found"},
173+
detail={"reason": "not_found", "message": f"Dag run with run_id: '{run_id}' not found"},
137174
)
138175
dag = get_dag_for_run(dag_bag, dag_run=dag_run, session=session)
139176

@@ -142,29 +179,27 @@ def clear_dag_run(
142179

143180
@router.get(
144181
"/{dag_id}/{run_id}/state",
145-
responses={
146-
status.HTTP_404_NOT_FOUND: {"description": "DAG not found for the given dag_id"},
147-
},
182+
responses={status.HTTP_404_NOT_FOUND: {"description": "Dag run not found"}},
148183
)
149184
def get_dagrun_state(
150185
dag_id: str,
151186
run_id: str,
152187
session: SessionDep,
153188
) -> DagRunStateResponse:
154-
"""Get a DAG Run State."""
155-
dag_run = session.scalar(
156-
select(DagRunModel).where(DagRunModel.dag_id == dag_id, DagRunModel.run_id == run_id)
157-
)
158-
if dag_run is None:
189+
"""Get a Dag run State."""
190+
try:
191+
state = session.scalars(
192+
select(DagRunModel.state).where(DagRunModel.dag_id == dag_id, DagRunModel.run_id == run_id)
193+
).one()
194+
except NoResultFound:
159195
raise HTTPException(
160196
status.HTTP_404_NOT_FOUND,
161197
detail={
162198
"reason": "not_found",
163-
"message": f"The DagRun with dag_id: `{dag_id}` and run_id: `{run_id}` was not found",
199+
"message": f"Dag run with dag_id '{dag_id}' and run_id '{run_id}' was not found",
164200
},
165201
)
166-
167-
return DagRunStateResponse(state=dag_run.state)
202+
return DagRunStateResponse(state=state)
168203

169204

170205
@router.get("/count", status_code=status.HTTP_200_OK)
@@ -175,46 +210,33 @@ def get_dr_count(
175210
run_ids: Annotated[list[str] | None, Query()] = None,
176211
states: Annotated[list[str] | None, Query()] = None,
177212
) -> int:
178-
"""Get the count of DAG runs matching the given criteria."""
179-
query = select(func.count()).select_from(DagRunModel).where(DagRunModel.dag_id == dag_id)
180-
213+
"""Get the count of Dag runs matching the given criteria."""
214+
stmt = select(func.count()).select_from(DagRunModel).where(DagRunModel.dag_id == dag_id)
181215
if logical_dates:
182-
query = query.where(DagRunModel.logical_date.in_(logical_dates))
183-
216+
stmt = stmt.where(DagRunModel.logical_date.in_(logical_dates))
184217
if run_ids:
185-
query = query.where(DagRunModel.run_id.in_(run_ids))
186-
218+
stmt = stmt.where(DagRunModel.run_id.in_(run_ids))
187219
if states:
188-
query = query.where(DagRunModel.state.in_(states))
220+
stmt = stmt.where(DagRunModel.state.in_(states))
221+
return session.scalar(stmt) or 0
189222

190-
count = session.scalar(query)
191-
return count or 0
192223

193-
194-
@router.get("/{dag_id}/previous", status_code=status.HTTP_200_OK)
224+
@router.get("/previous", status_code=status.HTTP_200_OK)
195225
def get_previous_dagrun(
196226
dag_id: str,
197227
logical_date: UtcDateTime,
198228
session: SessionDep,
199229
state: Annotated[DagRunState | None, Query()] = None,
200230
) -> DagRun | None:
201-
"""Get the previous DAG run before the given logical date, optionally filtered by state."""
202-
query = (
231+
"""Get the previous Dag run before the given logical date, optionally filtered by state."""
232+
stmt = (
203233
select(DagRunModel)
204-
.where(
205-
DagRunModel.dag_id == dag_id,
206-
DagRunModel.logical_date < logical_date,
207-
)
234+
.where(DagRunModel.dag_id == dag_id, DagRunModel.logical_date < logical_date)
208235
.order_by(DagRunModel.logical_date.desc())
209236
.limit(1)
210237
)
211-
212238
if state:
213-
query = query.where(DagRunModel.state == state)
214-
215-
dag_run = session.scalar(query)
216-
217-
if not dag_run:
239+
stmt = stmt.where(DagRunModel.state == state)
240+
if not (dag_run := session.scalar(stmt)):
218241
return None
219-
220242
return DagRun.model_validate(dag_run)

airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,14 @@
2929
from airflow.api_fastapi.execution_api.versions.v2025_10_27 import MakeDagRunConfNullable
3030
from airflow.api_fastapi.execution_api.versions.v2025_11_05 import AddTriggeringUserNameField
3131
from airflow.api_fastapi.execution_api.versions.v2025_11_07 import AddPartitionKeyField
32+
from airflow.api_fastapi.execution_api.versions.v2025_12_08 import (
33+
AddDagRunDetailEndpoint,
34+
MovePreviousRunEndpoint,
35+
)
3236

3337
bundle = VersionBundle(
3438
HeadVersion(),
39+
Version("2025-12-08", MovePreviousRunEndpoint, AddDagRunDetailEndpoint),
3540
Version("2025-11-07", AddPartitionKeyField),
3641
Version("2025-11-05", AddTriggeringUserNameField),
3742
Version("2025-10-27", MakeDagRunConfNullable),
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
from __future__ import annotations
19+
20+
from cadwyn import VersionChange, endpoint
21+
22+
23+
class MovePreviousRunEndpoint(VersionChange):
24+
"""Add new previous-run endpoint and migrate old endpoint."""
25+
26+
description = __doc__
27+
28+
instructions_to_migrate_to_previous_version = (
29+
endpoint("/dag-runs/previous", ["GET"]).didnt_exist,
30+
endpoint("/dag-runs/{dag_id}/previous", ["GET"]).existed,
31+
)
32+
33+
34+
class AddDagRunDetailEndpoint(VersionChange):
35+
"""Add dag run detail endpoint."""
36+
37+
description = __doc__
38+
39+
instructions_to_migrate_to_previous_version = (
40+
endpoint("/dag-runs/{dag_id}/{run_id}", ["GET"]).didnt_exist,
41+
)

0 commit comments

Comments
 (0)