diff --git a/src/workflows/airqo_etl_utils/date.py b/src/workflows/airqo_etl_utils/date.py index 7c7897b621..b9596f22c3 100644 --- a/src/workflows/airqo_etl_utils/date.py +++ b/src/workflows/airqo_etl_utils/date.py @@ -1,5 +1,5 @@ from datetime import datetime, timedelta, timezone -from typing import Tuple +from typing import Tuple, Optional, Dict, Any import logging @@ -7,12 +7,12 @@ class DateUtils: - day_start_date_time_format = "%Y-%m-%dT00:00:00Z" - day_end_date_time_format = "%Y-%m-%dT11:59:59Z" - hour_date_time_format = "%Y-%m-%dT%H:00:00Z" + day_start_date_time_format: str = "%Y-%m-%dT00:00:00Z" + day_end_date_time_format: str = "%Y-%m-%dT11:59:59Z" + hour_date_time_format: str = "%Y-%m-%dT%H:00:00Z" @staticmethod - def date_to_str(date: datetime, unit: str = None) -> str: + def date_to_str(date: datetime, unit: Optional[str] = None) -> str: """ Returns a string formatted datetime. """ @@ -27,10 +27,10 @@ def date_to_str(date: datetime, unit: str = None) -> str: @staticmethod def get_dag_date_time_values( - historical: bool = False, - days: int = None, - hours: int = None, - **kwargs, + historical: Optional[bool] = False, + days: Optional[int] = None, + hours: Optional[int] = None, + **kwargs: Dict[str, Any], ) -> Tuple[str, str]: """ Formats start and end dates. @@ -83,16 +83,33 @@ def get_dag_date_time_values( return start_date_time, end_date_time @staticmethod - def get_query_date_time_values(hours=1, days=0, **kwargs): + def get_query_date_time_values( + hours: Optional[int] = 1, days: Optional[int] = 0, **kwargs: Dict[str, Any] + ) -> Tuple[str, str]: + """ + Calculates query start and end datetime values based on the DAG run's execution date. - execution_date = kwargs["dag_run"].execution_date + This function determines the time window for querying data. If the `days` parameter is non-zero, + the time window is computed using days; otherwise, the `hours` parameter is used. The start datetime + is computed by subtracting the appropriate delta from the execution date, and the end datetime is computed + by adding the same delta to the start datetime. The final datetimes are formatted as strings using + the `date_to_str_hours` function. + + Args: + hours(int, optional): The number of hours to use for the time window when `days` is zero. Defaults to 1. + days(int, optional): The number of days to use for the time window. If non-zero, this value overrides `hours`. Defaults to 0. + **kwargs: Additional keyword arguments. Expected to contain a "dag_run" key with an `execution_date` attribute. - start_date_time = execution_date - timedelta(hours=hours) - end_date_time = start_date_time + timedelta(hours=hours) + Returns: + tuple: A tuple containing two strings: + - The formatted start datetime. + - The formatted end datetime. + """ + execution_date = kwargs["dag_run"].execution_date - if days != 0: - start_date_time = execution_date - timedelta(days=days) - end_date_time = start_date_time + timedelta(days=days) + delta = timedelta(days=days) if days else timedelta(hours=hours) + start_date_time = execution_date - delta + end_date_time = start_date_time + delta return date_to_str_hours(start_date_time), date_to_str_hours(end_date_time) diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 9681e39882..3d19811d04 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -665,6 +665,8 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ") previous_date = Variable.get("new_date_2021", default_var=start) + if previous_date == start: + Variable.set("new_date_2021", previous_date) hour_of_day = datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ") start_date_time = date_to_str_hours(hour_of_day)