From 8525401ffbab30f5f1b99ec4b5df13bb52b41e41 Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 4 Mar 2025 11:36:54 +0000 Subject: [PATCH 1/2] fix(ecsoperator): Fetch secret ARNs --- pyproject.toml | 1 + .../operators/ecs_run_task_operator.py | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 4a33464..56e606a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -85,6 +85,7 @@ disallow_untyped_defs = true module = [ "botocore.session", "botocore.client", + "boto3", "fsspec", "fsspec.implementations", "fsspec.implementations.local", diff --git a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py index 33eca68..30e9947 100644 --- a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py +++ b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py @@ -5,6 +5,7 @@ from collections.abc import Callable from typing import Any, ClassVar +import boto3 from airflow.providers.amazon.aws.operators.ecs import ( EcsDeregisterTaskDefinitionOperator, EcsRegisterTaskDefinitionOperator, @@ -14,7 +15,6 @@ ENV: str = os.getenv("ENVIRONMENT", "development") SENTRY_DSN: str = os.getenv("SENTRY_DSN", "") -AWS_ACCOUNT_ID: str = os.getenv("AWS_ACCOUNT_ID", "") ECS_SUBNET: str = os.getenv("ECS_SUBNET", "") ECS_SECURITY_GROUP: str = os.getenv("ECS_SECURITY_GROUP", "") ECS_EXECUTION_ROLE_ARN: str = os.getenv("ECS_EXECUTION_ROLE_ARN", "") @@ -33,7 +33,10 @@ class ECSOperatorGen: container_env: dict[str, str] = dataclasses.field(default_factory=dict) """The environment variables to pass to the container.""" container_secret_env: dict[str, list[str]] = dataclasses.field(default_factory=dict) - """Map of AWS secret arns to keys within the secret to pas to the container.""" + """Map of AWS secret names to keys within the secret to pas to the container. + + The secret ARN is fetched from the secret name via boto3. + """ container_cpu: int = 1024 """The CPU size of the container in milli-units.""" container_memory: int = 2048 @@ -70,6 +73,7 @@ def __post_init__(self) -> None: if self.domain not in ["uk", "india"]: raise ValueError(f"Domain must be one of ['uk', 'india'], got {self.domain}") + @property def cluster_region_tuple(self) -> tuple[str, str]: """Return the name of the ECS cluster and its region.""" @@ -78,8 +82,15 @@ def cluster_region_tuple(self) -> tuple[str, str]: return f"Nowcasting-{ENV}", "eu-west-1" def setup_operator(self) -> EcsRegisterTaskDefinitionOperator: - """Create an Airflow operator to register an ECS task definition.""" + """Create an Airflow operator to register an ECS task definition. + + Secrets are not passed through the environment directly but through + the `secrets` key in the container definition to prevent exposure in + the task definition. + """ _, region = self.cluster_region_tuple + sc = boto3.client("secretsmanager", region_name=region) + return EcsRegisterTaskDefinitionOperator( family=self.name, task_id=f"register_{self.name}", @@ -93,8 +104,9 @@ def setup_operator(self) -> EcsRegisterTaskDefinitionOperator: for k, v in (self.container_env | self._default_env).items() ], "secrets": [ - {"name": key, "valueFrom": f"arn:aws:secretsmanager:"\ - f"{region}:{AWS_ACCOUNT_ID}:secret:{secret}:{key}::", + { + "name": key, + "valueFrom": sc.get_secret_value(SecretId=secret)["ARN"], } for secret, keys in self.container_secret_env.items() for key in keys ], From 29fa2f617f8e6b92f2a2a0fd2b7617fa0151476c Mon Sep 17 00:00:00 2001 From: devsjc <47188100+devsjc@users.noreply.github.com> Date: Tue, 4 Mar 2025 12:05:42 +0000 Subject: [PATCH 2/2] fix(ecsoperator): Get secret arn from ownerid --- .../plugins/operators/ecs_run_task_operator.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py index 30e9947..4da7a86 100644 --- a/src/airflow_dags/plugins/operators/ecs_run_task_operator.py +++ b/src/airflow_dags/plugins/operators/ecs_run_task_operator.py @@ -2,7 +2,7 @@ import dataclasses import os -from collections.abc import Callable +from collections.abc import Callable, Sequence from typing import Any, ClassVar import boto3 @@ -13,12 +13,15 @@ ) from airflow.utils.trigger_rule import TriggerRule +# These should probably be templated instead of top-level, see +# https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code ENV: str = os.getenv("ENVIRONMENT", "development") SENTRY_DSN: str = os.getenv("SENTRY_DSN", "") ECS_SUBNET: str = os.getenv("ECS_SUBNET", "") ECS_SECURITY_GROUP: str = os.getenv("ECS_SECURITY_GROUP", "") ECS_EXECUTION_ROLE_ARN: str = os.getenv("ECS_EXECUTION_ROLE_ARN", "") -AWS_REGION: str = os.getenv("AWS_REGION", "eu-west-1") +AWS_REGION: str = os.getenv("AWS_DEFAULT_REGION", "eu-west-1") +AWS_OWNER_ID: str = os.getenv("AWS_OWNER_ID", "") @dataclasses.dataclass class ECSOperatorGen: @@ -89,7 +92,6 @@ def setup_operator(self) -> EcsRegisterTaskDefinitionOperator: the task definition. """ _, region = self.cluster_region_tuple - sc = boto3.client("secretsmanager", region_name=region) return EcsRegisterTaskDefinitionOperator( family=self.name, @@ -104,9 +106,8 @@ def setup_operator(self) -> EcsRegisterTaskDefinitionOperator: for k, v in (self.container_env | self._default_env).items() ], "secrets": [ - { - "name": key, - "valueFrom": sc.get_secret_value(SecretId=secret)["ARN"], + {"name": key, "valueFrom": "arn:aws:secretsmanager"\ + f":{region}:{AWS_OWNER_ID}:secret:{key}::", } for secret, keys in self.container_secret_env.items() for key in keys ],