Skip to content

Commit

Permalink
feat(nowcasting-dev): Add satellite-consumer and airflow dag (#803)
Browse files Browse the repository at this point in the history
* feat(nowcasting-dev): Add satellite-consumer and airflow dag

* fix(nowcasting-dev): Add sat variable

* fix(dag): Remove error message on rss failure
  • Loading branch information
devsjc authored Feb 26, 2025
1 parent 1ccfcc1 commit 8054613
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 1 deletion.
96 changes: 96 additions & 0 deletions terraform/modules/services/airflow/dags/uk/sat-dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Dag to download and process satellite data from EUMETSAT.
Consists of two tasks made from the same ECS operator,
one for RSS data and one for Odegree data.
The 0degree data task only runs if the RSS data task fails.
"""

import os
import datetime as dt
from airflow.providers.amazon.aws.operators.ecs import EcsRunTaskOperator, EcsRegisterTaskDefinitionOperator
from airflow.decorators import dag
from airflow.utils.trigger_rule import TriggerRule

from airflow.operators.latest_only import LatestOnlyOperator
from utils.slack import slack_message_callback

env = os.getenv("ENVIRONMENT", "development")

default_dag_args = {
"owner": "airflow",
"depends_on_past": False,
"retries": 1,
"retry_delay": dt.timedelta(minutes=1),
"max_active_runs": 10,
"concurrency": 10,
"max_active_tasks": 10,
"execution_timeout": dt.timedelta(minutes=30),
}
default_task_args = {
"cluster": f"Nowcasting-{env}",
"task_definition": "satellite-consumer",
"launch_type": "FARGATE",
"task_concurrency": 1,
"network_configuration": {
"awsvpcConfiguration": {
"subnets": [os.getenv("ECS_SUBNET")],
"securityGroups": [os.getenv("ECS_SECURITY_GROUP")],
"assignPublicIp": "ENABLED",
},
},
"awslogs_group": "/aws/ecs/consumer/sat",
"awslogs_stream_prefix": "streaming/sat-consumer",
"awslogs_region": "eu-west-1",
}

satellite_error_message = (
)

@dag(
dag_id="uk-satellite-consumer",
description=__doc__,
schedule_interval="*/5 * * * *",
start_date=dt.datetime.now(tz=dt.timezone.utc) - dt.timedelta(hours=0.5),
catchup=False,
default_args=default_dag_args,
)
def sat_consumer_dag():

latest_only = LatestOnlyOperator(task_id="latest_only")

sat_consumer_rss = EcsRunTaskOperator(
task_id="satellite-consumer-rss",
overrides={"containerOverrides": [{
"name": "satellite-consumer",
"environment": {
"SATCONS_SATELLITE": "rss",
"SATCONS_WORKDIR": "s3://nowcasting-sat-{env}/testdata",
},
}]},
**default_task_args,
)

sat_consumer_odegree = EcsRunTaskOperator(
task_id="satellite-consumer-odegree",
trigger_rule=TriggerRule.ALL_FAILED,
overrides={"containerOverrides": [{
"name": "satellite-consumer",
"environment": {
"SATCONS_SATELLITE": "odegree",
"SATCONS_WORKDIR": "s3://nowcasting-sat-{env}/testdata",
},
}]},
on_failure_callback=slack_message_callback((
"⚠️ The task {{ ti.task_id }} failed to collect odegree satellite data. "
"The forecast will automatically move over to PVNET-ECMWF "
"which doesn't need satellite data. "
"Forecast quality may be impacted, but no out-of-hours support is required. "
"Please log in an incident log. "
)),
**default_task_args,
)

latest_only >> sat_consumer_rss >> sat_consumer_odegree

sat_consumer_dag()

35 changes: 35 additions & 0 deletions terraform/nowcasting/development/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ The componentes ares:
3.8 - PVLive Consumer - GSP Day After
3.9 - PVLive Consumer - National Day After
3.10 - NESO Forecast Consumer
3.11 - Satellite Consumer (new)
4.1 - Metrics
4.2 - Cloudcasting app
4.3 - Forecast National XG
Expand Down Expand Up @@ -504,6 +505,40 @@ module "neso-forecast-consumer" {
container-command = []
}

# 3.11
module "satellite-consumer" {
source = "../../modules/services/ecs_task"

ecs-task_name = "satellite-consumer"
ecs-task_type = "consumer"
ecs-task_execution_role_arn = module.ecs.ecs_task_execution_role_arn
ecs-task_size = {
cpu = 1024
memory = 2048
}

aws-region = var.region
aws-environment = local.environment
s3-buckets = [
{
id : module.s3.s3-sat-bucket.id,
access_policy_arn : module.s3.iam-policy-s3-sat-write.arn
}
]
container-env_vars = [
{"name": "SATCONS_COMMAND", "value": "CONSUME"},
{"name": "SATCONS_RESCALE", "value": "true"},
]
container-secret_vars = [
{secret_policy_arn: aws_secretsmanager_secret.satellite_consumer_secret.arn,
values: ["EUMETSAT_CONSUMER_KEY", "EUMETSAT_CONSUMER_SECRET"]}
]
container-tag = var.satellite_consumer_version
container-name = "openclimatefix/satellite-consumer"
container-registry = "ghcr.io"
container-command = []
}

# 4.1
module "metrics" {
source = "../../modules/services/ecs_task"
Expand Down
8 changes: 7 additions & 1 deletion terraform/nowcasting/development/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,10 @@ variable "cloudcasting_app_version" {
type = string
description = "The version of the cloucasting app forecast consumer"
default = "0.0.2"
}
}

variable "satellite_consumer_version" {
type = string
description = "The version of the satellite consumer"
default = "0.0.2"
}

0 comments on commit 8054613

Please sign in to comment.