diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py b/airflow/api_connexion/endpoints/dag_endpoint.py index 6119d68be4942..a301699b380db 100644 --- a/airflow/api_connexion/endpoints/dag_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_endpoint.py @@ -29,7 +29,7 @@ from airflow import DAG from airflow.api_connexion import security from airflow.api_connexion.exceptions import AlreadyExists, BadRequest, NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dag_schema import ( DAGCollection, dag_detail_schema, @@ -41,6 +41,7 @@ from airflow.models.dag import DagModel, DagTag from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py b/airflow/api_connexion/endpoints/dag_run_endpoint.py index cd27dc0f8cf7c..490923c6c381f 100644 --- a/airflow/api_connexion/endpoints/dag_run_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py @@ -40,7 +40,6 @@ check_limit, format_datetime, format_parameters, - get_query_count, ) from airflow.api_connexion.schemas.dag_run_schema import ( DAGRunCollection, @@ -63,6 +62,7 @@ from airflow.models import DagModel, DagRun from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.log.action_logger import action_event_from_permission from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState diff --git a/airflow/api_connexion/endpoints/dag_warning_endpoint.py b/airflow/api_connexion/endpoints/dag_warning_endpoint.py index dd5f8dd94805d..1031d532a00f5 100644 --- a/airflow/api_connexion/endpoints/dag_warning_endpoint.py +++ b/airflow/api_connexion/endpoints/dag_warning_endpoint.py @@ -20,7 +20,7 @@ from sqlalchemy.orm import Session from airflow.api_connexion import security -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dag_warning_schema import ( DagWarningCollection, dag_warning_collection_schema, @@ -28,6 +28,7 @@ from airflow.api_connexion.types import APIResponse from airflow.models.dagwarning import DagWarning as DagWarningModel from airflow.security import permissions +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/dataset_endpoint.py b/airflow/api_connexion/endpoints/dataset_endpoint.py index 62c15634fb148..71012b59a2e72 100644 --- a/airflow/api_connexion/endpoints/dataset_endpoint.py +++ b/airflow/api_connexion/endpoints/dataset_endpoint.py @@ -21,7 +21,7 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import NotFound -from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import apply_sorting, check_limit, format_parameters from airflow.api_connexion.schemas.dataset_schema import ( DatasetCollection, DatasetEventCollection, @@ -32,6 +32,7 @@ from airflow.api_connexion.types import APIResponse from airflow.models.dataset import DatasetEvent, DatasetModel from airflow.security import permissions +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 6ddc0f59d0f35..55db7ef8b9377 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -27,7 +27,7 @@ from airflow.api_connexion import security from airflow.api_connexion.endpoints.request_dict import get_json_request_dict from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import format_datetime, format_parameters, get_query_count +from airflow.api_connexion.parameters import format_datetime, format_parameters from airflow.api_connexion.schemas.task_instance_schema import ( TaskInstanceCollection, TaskInstanceReferenceCollection, @@ -48,6 +48,7 @@ from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances from airflow.security import permissions from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session from airflow.utils.state import DagRunState, TaskInstanceState diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py b/airflow/api_connexion/endpoints/xcom_endpoint.py index 708eae57b2327..2ea4db79f7aaf 100644 --- a/airflow/api_connexion/endpoints/xcom_endpoint.py +++ b/airflow/api_connexion/endpoints/xcom_endpoint.py @@ -24,13 +24,14 @@ from airflow.api_connexion import security from airflow.api_connexion.exceptions import BadRequest, NotFound -from airflow.api_connexion.parameters import check_limit, format_parameters, get_query_count +from airflow.api_connexion.parameters import check_limit, format_parameters from airflow.api_connexion.schemas.xcom_schema import XComCollection, xcom_collection_schema, xcom_schema from airflow.api_connexion.types import APIResponse from airflow.models import DagRun as DR, XCom from airflow.security import permissions from airflow.settings import conf from airflow.utils.airflow_flask_app import get_airflow_app +from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/api_connexion/parameters.py b/airflow/api_connexion/parameters.py index 0c4e1b91af13a..f4f55cfecd378 100644 --- a/airflow/api_connexion/parameters.py +++ b/airflow/api_connexion/parameters.py @@ -21,9 +21,8 @@ from functools import wraps from typing import Any, Callable, Container, TypeVar, cast -import sqlalchemy.orm from pendulum.parsing import ParserError -from sqlalchemy import func, select, text +from sqlalchemy import text from sqlalchemy.sql import Select from airflow.api_connexion.exceptions import BadRequest @@ -126,9 +125,3 @@ def apply_sorting( else: order_by = f"{lstriped_orderby} asc" return query.order_by(text(order_by)) - - -def get_query_count(query_stmt: sqlalchemy.sql.selectable.Select, session: sqlalchemy.orm.Session) -> int: - """Get count of query.""" - count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) - return session.scalar(count_stmt) diff --git a/airflow/utils/db.py b/airflow/utils/db.py index eb4c863dc3e3c..eb94792aa5095 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -45,6 +45,7 @@ from alembic.runtime.environment import EnvironmentContext from alembic.script import ScriptDirectory from sqlalchemy.orm import Query, Session + from sqlalchemy.sql.selectable import Select from airflow.models.base import Base from airflow.models.connection import Connection @@ -1872,3 +1873,9 @@ def get_sqla_model_classes(): return [mapper.class_ for mapper in Base.registry.mappers] except AttributeError: return Base._decl_class_registry.values() + + +def get_query_count(query_stmt: Select, session: Session) -> int: + """Get count of query.""" + count_stmt = select(func.count()).select_from(query_stmt.order_by(None).subquery()) + return session.scalar(count_stmt) diff --git a/airflow/www/views.py b/airflow/www/views.py index 6a9648b1cb8be..d175b7be50795 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -83,7 +83,6 @@ set_dag_run_state_to_success, set_state, ) -from airflow.api_connexion.parameters import get_query_count from airflow.configuration import AIRFLOW_CONFIG, auth_manager, conf from airflow.datasets import Dataset from airflow.exceptions import ( @@ -118,6 +117,7 @@ from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.dag_edges import dag_edges from airflow.utils.dates import infer_time_unit, scale_time_units +from airflow.utils.db import get_query_count from airflow.utils.docs import get_doc_url_for_provider, get_docs_url from airflow.utils.helpers import alchemy_to_dict, exactly_one from airflow.utils.log import secrets_masker