File tree Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Expand file tree Collapse file tree 1 file changed +4
-1
lines changed Original file line number Diff line number Diff line change 10
10
11
11
from airflow .decorators import dag
12
12
from airflow .operators .bash import BashOperator
13
+ from airflow .operators .latest_only import LatestOnlyOperator
13
14
from airflow .utils .trigger_rule import TriggerRule
14
15
15
16
from airflow_dags .plugins .callbacks .slack import slack_message_callback
@@ -69,6 +70,8 @@ def update_operator(cadence_mins: int) -> BashOperator:
69
70
)
70
71
def sat_consumer_dag () -> None :
71
72
"""Dag to download and process satellite data from EUMETSAT."""
73
+ latest_op = LatestOnlyOperator (task_id = "determine_latest_run" )
74
+
72
75
setup_op = sat_consumer .setup_operator ()
73
76
74
77
consume_rss_op = sat_consumer .run_task_operator (
@@ -97,7 +100,7 @@ def sat_consumer_dag() -> None:
97
100
98
101
teardown_op = sat_consumer .teardown_operator ()
99
102
100
- setup_op >> consume_rss_op >> consume_odegree_op >> teardown_op
103
+ latest_op >> setup_op >> consume_rss_op >> consume_odegree_op >> teardown_op
101
104
102
105
sat_consumer_dag ()
103
106
You can’t perform that action at this time.
0 commit comments