From fd99eb9110751c4ccc5502b119062a4b33d2812a Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Mon, 3 Mar 2025 11:50:20 +0000 Subject: [PATCH] chore(sat): Update sat-etl pipeline (#1) * chore(sat): Update default_args name * fix(worflows): Add xmlsec to branch workflow * fix(workflow): Disable linting * fix(pyproject): Pin python version * fix(pyproject): Pin python correctly --- .github/workflows/branch_ci.yml | 3 ++- pyproject.toml | 2 +- src/airflow_dags/dags/uk/sat-dag.py | 10 +++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/.github/workflows/branch_ci.yml b/.github/workflows/branch_ci.yml index ae3aa67..0699067 100644 --- a/.github/workflows/branch_ci.yml +++ b/.github/workflows/branch_ci.yml @@ -10,8 +10,9 @@ jobs: uses: openclimatefix/.github/.github/workflows/nondefault_branch_push_ci_python.yml@main secrets: inherit with: - enable_linting: true + enable_linting: false enable_typechecking: false tests_folder: "tests" tests_runner: "unittest" + tests_conda_deps: "xmlsec" diff --git a/pyproject.toml b/pyproject.toml index 68621c0..4a33464 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,7 +10,7 @@ name = "airflow-dags" dynamic = ["version"] # Set automtically using git: https://setuptools-git-versioning.readthedocs.io/en/stable/ description = "Airflow DAGs for running Open Climate Fix's production pipelines" readme = {file = "README.md", content-type = "text/markdown"} -requires-python = ">=3.12.0" +requires-python = ">=3.12.0,<3.13" license = {text = "MIT License"} authors = [ { name = "Open Climate Fix Team", email = "info@openclimatefix.org" }, diff --git a/src/airflow_dags/dags/uk/sat-dag.py b/src/airflow_dags/dags/uk/sat-dag.py index bffed35..2470494 100644 --- a/src/airflow_dags/dags/uk/sat-dag.py +++ b/src/airflow_dags/dags/uk/sat-dag.py @@ -17,14 +17,14 @@ env = os.getenv("ENVIRONMENT", "development") -default_dag_args = { +default_args = { "owner": "airflow", "depends_on_past": False, "retries": 1, "retry_delay": dt.timedelta(minutes=1), - "max_active_runs": 10, - "concurrency": 10, - "max_active_tasks": 10, + "max_active_runs": 2, + "concurrency": 2, + "max_active_tasks": 2, "execution_timeout": dt.timedelta(minutes=30), } @@ -65,7 +65,7 @@ def update_operator(cadence_mins: int) -> BashOperator: schedule_interval="*/5 * * * *", start_date=dt.datetime(2025, 1, 1, tzinfo=dt.UTC), catchup=False, - default_args=default_dag_args, + default_args=default_args, ) def sat_consumer_dag() -> None: """Dag to download and process satellite data from EUMETSAT."""