Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ecsoperator): Fetch secret ARNs #10

Merged
merged 2 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ disallow_untyped_defs = true
module = [
"botocore.session",
"botocore.client",
"boto3",
"fsspec",
"fsspec.implementations",
"fsspec.implementations.local",
Expand Down
27 changes: 20 additions & 7 deletions src/airflow_dags/plugins/operators/ecs_run_task_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@

import dataclasses
import os
from collections.abc import Callable
from collections.abc import Callable, Sequence
from typing import Any, ClassVar

import boto3
from airflow.providers.amazon.aws.operators.ecs import (
EcsDeregisterTaskDefinitionOperator,
EcsRegisterTaskDefinitionOperator,
EcsRunTaskOperator,
)
from airflow.utils.trigger_rule import TriggerRule

# These should probably be templated instead of top-level, see
# https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
ENV: str = os.getenv("ENVIRONMENT", "development")
SENTRY_DSN: str = os.getenv("SENTRY_DSN", "")
AWS_ACCOUNT_ID: str = os.getenv("AWS_ACCOUNT_ID", "")
ECS_SUBNET: str = os.getenv("ECS_SUBNET", "")
ECS_SECURITY_GROUP: str = os.getenv("ECS_SECURITY_GROUP", "")
ECS_EXECUTION_ROLE_ARN: str = os.getenv("ECS_EXECUTION_ROLE_ARN", "")
AWS_REGION: str = os.getenv("AWS_REGION", "eu-west-1")
AWS_REGION: str = os.getenv("AWS_DEFAULT_REGION", "eu-west-1")
AWS_OWNER_ID: str = os.getenv("AWS_OWNER_ID", "")

@dataclasses.dataclass
class ECSOperatorGen:
Expand All @@ -33,7 +36,10 @@ class ECSOperatorGen:
container_env: dict[str, str] = dataclasses.field(default_factory=dict)
"""The environment variables to pass to the container."""
container_secret_env: dict[str, list[str]] = dataclasses.field(default_factory=dict)
"""Map of AWS secret arns to keys within the secret to pas to the container."""
"""Map of AWS secret names to keys within the secret to pas to the container.

The secret ARN is fetched from the secret name via boto3.
"""
container_cpu: int = 1024
"""The CPU size of the container in milli-units."""
container_memory: int = 2048
Expand Down Expand Up @@ -70,6 +76,7 @@ def __post_init__(self) -> None:
if self.domain not in ["uk", "india"]:
raise ValueError(f"Domain must be one of ['uk', 'india'], got {self.domain}")


@property
def cluster_region_tuple(self) -> tuple[str, str]:
"""Return the name of the ECS cluster and its region."""
Expand All @@ -78,8 +85,14 @@ def cluster_region_tuple(self) -> tuple[str, str]:
return f"Nowcasting-{ENV}", "eu-west-1"

def setup_operator(self) -> EcsRegisterTaskDefinitionOperator:
"""Create an Airflow operator to register an ECS task definition."""
"""Create an Airflow operator to register an ECS task definition.

Secrets are not passed through the environment directly but through
the `secrets` key in the container definition to prevent exposure in
the task definition.
"""
_, region = self.cluster_region_tuple

return EcsRegisterTaskDefinitionOperator(
family=self.name,
task_id=f"register_{self.name}",
Expand All @@ -93,8 +106,8 @@ def setup_operator(self) -> EcsRegisterTaskDefinitionOperator:
for k, v in (self.container_env | self._default_env).items()
],
"secrets": [
{"name": key, "valueFrom": f"arn:aws:secretsmanager:"\
f"{region}:{AWS_ACCOUNT_ID}:secret:{secret}:{key}::",
{"name": key, "valueFrom": "arn:aws:secretsmanager"\
f":{region}:{AWS_OWNER_ID}:secret:{key}::",
} for secret, keys in self.container_secret_env.items()
for key in keys
],
Expand Down
Loading