From ab6292c402a975d7e36b7858061b8db35adf9071 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:08:48 +0300 Subject: [PATCH 1/6] Update auth service production image tag to prod-855515c6-1739261225 --- k8s/auth-service/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index c9c1f41dca..da1479c0ba 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-dd79c919-1739257284 + tag: prod-855515c6-1739261225 nameOverride: '' fullnameOverride: '' podAnnotations: {} From e81a04301216fbbf746b478247cfe95d7854744b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:08:59 +0300 Subject: [PATCH 2/6] Update device registry production image tag to prod-855515c6-1739261225 --- k8s/device-registry/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 4d62100d4b..c71ca6d1d3 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-dd79c919-1739257284 + tag: prod-855515c6-1739261225 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 1bf66b37311905a240107a69941f16d3675fcff6 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:09:12 +0300 Subject: [PATCH 3/6] Update workflows staging image tag to stage-39f00e8a-1739261191 --- k8s/workflows/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-stage.yaml b/k8s/workflows/values-stage.yaml index d16f1d46be..cb7f4dd5c1 100644 --- a/k8s/workflows/values-stage.yaml +++ b/k8s/workflows/values-stage.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis containers: eu.gcr.io/airqo-250220/airqo-stage-workflows - tag: stage-f00a37c2-1739257230 + tag: stage-39f00e8a-1739261191 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 67845999fd9292075c0427d120d2b17bc2539649 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:09:53 +0300 Subject: [PATCH 4/6] Update spatial production image tag to prod-855515c6-1739261225 --- k8s/spatial/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/spatial/values-prod.yaml b/k8s/spatial/values-prod.yaml index 92958d1ffb..0502f5d385 100644 --- a/k8s/spatial/values-prod.yaml +++ b/k8s/spatial/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-spatial-api - tag: prod-dd79c919-1739257284 + tag: prod-855515c6-1739261225 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 728f2707d31d4132f7bc5944d4e139742defedc7 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 11:09:57 +0300 Subject: [PATCH 5/6] Update workflows prod image tag to prod-855515c6-1739261225 --- k8s/workflows/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index fc206c6dc6..a3eab5e476 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-dd79c919-1739257284 + tag: prod-855515c6-1739261225 nameOverride: '' fullnameOverride: '' podAnnotations: {} From a576f6c65df4381332dc49c6b87435b50fe11879 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 11 Feb 2025 12:02:49 +0300 Subject: [PATCH 6/6] Clean up and documentation --- .../airqo_etl_utils/daily_data_utils.py | 58 ++++++++++++++++--- src/workflows/airqo_etl_utils/datautils.py | 12 ++-- src/workflows/dags/airqo_measurements.py | 2 +- 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/src/workflows/airqo_etl_utils/daily_data_utils.py b/src/workflows/airqo_etl_utils/daily_data_utils.py index 1b575f4bbb..79f936f412 100644 --- a/src/workflows/airqo_etl_utils/daily_data_utils.py +++ b/src/workflows/airqo_etl_utils/daily_data_utils.py @@ -2,6 +2,9 @@ from airqo_etl_utils.bigquery_api import BigQueryApi from airqo_etl_utils.data_validator import DataValidationUtils +from airqo_etl_utils.constants import DataType, Frequency, DeviceCategory +from airqo_etl_utils.config import configuration as Config +from typing import Optional class DailyDataUtils: @@ -53,21 +56,44 @@ def average_data(data: pd.DataFrame) -> pd.DataFrame: @staticmethod def cleanup_and_reload( - data: pd.DataFrame, start_date_time=None, end_date_time=None - ): + data: pd.DataFrame, + start_date_time: Optional[str] = None, + end_date_time: Optional[str] = None, + ) -> None: + """ + Cleans up the input dataset by removing duplicates and reloads the processed data into BigQuery. + + This function: + - Converts the "timestamp" column to a datetime format. + - Removes duplicate entries based on "device_number", "device_id", and "timestamp". + - Validates and processes the data to match BigQuery requirements. + - Reloads the cleaned dataset into the BigQuery daily measurements table. + + Args: + data(pd.DataFrame): The input dataset containing device measurements. + start_date_time(str, optional): The start timestamp for data reloading (ISO format). + end_date_time(str, optional): The end timestamp for data reloading (ISO format). + + Returns: + None + """ data["timestamp"] = data["timestamp"].apply(pd.to_datetime) - data = data.drop_duplicates( - subset=["device_number", "device_id", "timestamp"], keep="first" + data.drop_duplicates( + subset=["device_number", "device_id", "timestamp"], + keep="first", + inplace=True, ) bigquery_api = BigQueryApi() - table = bigquery_api.daily_measurements_table + + source = Config.DataSource.get(DataType.AVERAGED) + table = source.get(DeviceCategory.GENERAL).get(Frequency.DAILY) + data = DataValidationUtils.process_for_big_query( dataframe=data, table=table, ) bigquery_api.reload_data( - network="all", table=table, dataframe=data, start_date_time=start_date_time, @@ -75,11 +101,25 @@ def cleanup_and_reload( ) @staticmethod - def save_data(data: pd.DataFrame): - bigquery_api = BigQueryApi() + def save_data(data: pd.DataFrame) -> None: + """ + Processes and saves the given dataset to BigQuery. + + This function: + - Retrieves the BigQuery daily measurements table. + - Validates and processes the dataset to ensure it meets BigQuery requirements. + - Loads the processed data into the designated BigQuery table. + + Args: + data (pd.DataFrame): The dataset containing measurement data to be stored. - table = bigquery_api.daily_measurements_table + Returns: + None + """ + bigquery_api = BigQueryApi() + source = Config.DataSource.get(DataType.AVERAGED) + table = source.get(DeviceCategory.GENERAL).get(Frequency.DAILY) data = DataValidationUtils.process_for_big_query( dataframe=data, table=table, diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 39356c22ea..1343cf5559 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -308,22 +308,22 @@ def extract_data_from_bigquery( end_date_time: str, frequency: Frequency, device_category: DeviceCategory, - device_network: DeviceNetwork = None, - dynamic_query: bool = False, - remove_outliers: bool = True, + device_network: Optional[DeviceNetwork] = None, + dynamic_query: Optional[bool] = False, + remove_outliers: Optional[bool] = True, ) -> pd.DataFrame: """ Extracts data from BigQuery within a specified time range and frequency, with an optional filter for the device network. The data is cleaned to remove outliers. Args: - datatype(str): The type of data to extract determined by the source data asset. + datatype(DataType): The type of data to extract determined by the source data asset. start_date_time(str): The start of the time range for data extraction, in ISO 8601 format. end_date_time(str): The end of the time range for data extraction, in ISO 8601 format. frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY. device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter). - dynamic_query (bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False. - remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True. + dynamic_query(bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False. + remove_outliers(bool, optional): If True, removes outliers from the extracted data. Defaults to True. Returns: pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery. diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index b21c5304e3..06114a7cc6 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -675,7 +675,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: start_date_time = date_to_str_hours(hour_of_day) end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z") - if start_date_time > end or end_date_time > end: + if hour_of_day > end or (hour_of_day + timedelta(hours=1)) > end: raise AirflowFailException(f"Run expired on {end}") if previous_date == start: