From a98462edcbc091beefb125835527154abcd3b347 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 22:47:23 +0300 Subject: [PATCH 1/9] Update data mgt production image tag to prod-35663377-1741203957 --- k8s/data-mgt/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/data-mgt/values-prod.yaml b/k8s/data-mgt/values-prod.yaml index 07164d4ccd..095a9fdf11 100644 --- a/k8s/data-mgt/values-prod.yaml +++ b/k8s/data-mgt/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api - tag: prod-4fb630c8-1741195697 + tag: prod-35663377-1741203957 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 0bf23e4d45b5da9df022d76444507d5534ee5964 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 22:47:43 +0300 Subject: [PATCH 2/9] Update workflows staging image tag to stage-bc9720b8-1741203916 --- k8s/workflows/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-stage.yaml b/k8s/workflows/values-stage.yaml index ec3f1f8454..90407dd164 100644 --- a/k8s/workflows/values-stage.yaml +++ b/k8s/workflows/values-stage.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis containers: eu.gcr.io/airqo-250220/airqo-stage-workflows - tag: stage-b3cf9be4-1741179758 + tag: stage-bc9720b8-1741203916 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 4090c01fdcb41abee785fe3527d74f073c23d90e Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 22:47:52 +0300 Subject: [PATCH 3/9] Update device registry production image tag to prod-35663377-1741203957 --- k8s/device-registry/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index bcc2fecebc..79b20d2f86 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-4fb630c8-1741195697 + tag: prod-35663377-1741203957 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 089bb2e3139483091eeb2e5f7188f6d166ea840b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 22:48:41 +0300 Subject: [PATCH 4/9] Update workflows prod image tag to prod-35663377-1741203957 --- k8s/workflows/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index e6e51b9914..b6359d466d 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-4fb630c8-1741195697 + tag: prod-35663377-1741203957 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 8e0a0ee427aaa2bf2057050a724270f73b981fce Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Wed, 5 Mar 2025 22:50:21 +0300 Subject: [PATCH 5/9] Update predict production image tag to prod-35663377-1741203957 --- k8s/predict/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 8274b06fea..14b0be706a 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-158790b4-1741195433 + tag: prod-35663377-1741203957 api: name: airqo-prediction-api label: prediction-api From 7509f0c8ff908a65ddb97cea8411b0cc4ae66a9d Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 6 Mar 2025 23:34:35 +0300 Subject: [PATCH 6/9] Clean up coordinates --- src/workflows/airqo_etl_utils/datautils.py | 37 ++++++++++------------ 1 file changed, 16 insertions(+), 21 deletions(-) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 94b3983642..a11ab42f9e 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -2,6 +2,7 @@ import pandas as pd from pathlib import Path import json +import ast from confluent_kafka import KafkaException from typing import List, Dict, Any, Union, Tuple, Optional @@ -1036,32 +1037,26 @@ def process_data_for_message_broker( # Clarity def _flatten_location_coordinates_clarity(coordinates: str) -> pd.Series: """ - Extracts latitude and longitude from a coordinate string. - - The function expects a string representation of coordinates in the format "[longitude, latitude]". It removes square brackets and spaces, splits - the values, and returns them as a Pandas Series. + Extracts latitude and longitude from a string representation of coordinates. Args: - coordinates(str): A string containing coordinates in the format "[longitude, latitude]". + coordinates(str): A string containing a list or tuple with two numeric values representing latitude and longitude (e.g., "[37.7749, -122.4194]"). Returns: - pd.Series: A Pandas Series with 'latitude' and 'longitude' as keys. Returns None for both values if an error occurs. - - Example: - >>> _flatten_location_coordinates("[-73.935242, 40.730610]") - latitude 40.730610 - longitude -73.935242 - dtype: object + pd.Series: A Pandas Series containing two values: + - latitude (float) at index 0 + - longitude (float) at index 1 + If parsing fails or the format is invalid, returns Series([None, None]). """ - try: - coords = coordinates.strip("[] ").split(",") - return pd.Series( - {"latitude": coords[1].strip(), "longitude": coords[0].strip()} - ) - except Exception as ex: - logger.exception("Error parsing coordinates: %s", ex) - return pd.Series({"latitude": None, "longitude": None}) + coords = ast.literal_eval(coordinates) + + if isinstance(coords, (list, tuple)) and len(coords) == 2: + return pd.Series(coords) + except (ValueError, SyntaxError): + logger.exception("Error occurred while cleaning up coordinates") + + return pd.Series([None, None]) def _transform_clarity_data(data: pd.DataFrame) -> pd.DataFrame: """ @@ -1106,7 +1101,7 @@ def _transform_clarity_data(data: pd.DataFrame) -> pd.DataFrame: ) data[["latitude", "longitude"]] = data["location.coordinates"].apply( - DataUtils._flatten_location_coordinates + DataUtils._flatten_location_coordinates_clarity ) devices, _ = DataUtils.get_devices() From eb3f1a9062535d055bf9f769ce524000185d5c60 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 6 Mar 2025 23:35:35 +0300 Subject: [PATCH 7/9] Remove repetitive ops --- src/workflows/airqo_etl_utils/airnow_utils.py | 55 ++++++++++--------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index efd96b0681..a66745a2ec 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -2,42 +2,46 @@ import pandas as pd from .airnow_api import AirNowApi -from .airqo_api import AirQoApi from .constants import DataSource, DeviceCategory, Frequency, DeviceNetwork -from .data_validator import DataValidationUtils from .date import str_to_date, date_to_str from .utils import Utils +from .data_validator import DataValidationUtils from .datautils import DataUtils -from .config import configuration +from .config import configuration as Config import logging logger = logging.getLogger(__name__) class AirnowDataUtils: - @staticmethod - def parameter_column_name(parameter: str) -> str: - parameter = parameter.lower() - if parameter == "pm2.5": - return "pm2_5" - elif parameter == "pm10": - return "pm10" - elif parameter == "no2": - return "no2" - else: - raise Exception(f"Unknown parameter {parameter}") - @staticmethod def query_bam_data( api_key: str, start_date_time: str, end_date_time: str ) -> pd.DataFrame: + """ + Queries BAM (Beta Attenuation Monitor) data from the AirNow API within the given date range. + + This function converts the input date strings into the required format for the API, + retrieves air quality data, and returns it as a Pandas DataFrame. + + Args: + api_key(str): The API key required for authentication with AirNow. + start_date_time(str): The start datetime in string format (expected format: "YYYY-MM-DD HH:MM"). + end_date_time(str): The end datetime in string format (expected format: "YYYY-MM-DD HH:MM"). + + Returns: + pd.DataFrame: A DataFrame containing the air quality data retrieved from the AirNow API. + + Example: + >>> df = query_bam_data("your_api_key", "2024-03-01 00:00", "2024-03-02 23:59") + >>> print(df.head()) + """ airnow_api = AirNowApi() + date_format = "%Y-%m-%dT%H:%M" start_date_time = date_to_str( - str_to_date(start_date_time), str_format="%Y-%m-%dT%H:%M" - ) - end_date_time = date_to_str( - str_to_date(end_date_time), str_format="%Y-%m-%dT%H:%M" + str_to_date(start_date_time), str_format=date_format ) + end_date_time = date_to_str(str_to_date(end_date_time), str_format=date_format) data = airnow_api.get_data( api_key=api_key, @@ -46,7 +50,7 @@ def query_bam_data( end_date_time=end_date_time, ) - return pd.DataFrame(data) + return pd.DataFrame(data) if data else pd.DataFrame() @staticmethod def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: @@ -78,7 +82,7 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: if not dates: raise ValueError("Invalid or empty date range provided.") - api_key = configuration.US_EMBASSY_API_KEY + api_key = Config.US_EMBASSY_API_KEY all_device_data = [] device_data = [] @@ -141,10 +145,11 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame: logger.exception(f"Device with ID {device_id_} not found") continue - parameter_col_name = AirnowDataUtils.parameter_column_name( - row["Parameter"] - ) - if parameter_col_name in pollutant_value: + parameter_col_name = Config.device_config_mapping.get( + DeviceCategory.BAM.str, {} + ).get(row["Parameter"].lower(), None) + + if parameter_col_name and parameter_col_name in pollutant_value: pollutant_value[parameter_col_name] = row["Value"] if row["network"] != device_details.get("network"): From cdb4a953cf55c67d5fdfafdba359368d53c6145a Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 6 Mar 2025 23:36:59 +0300 Subject: [PATCH 8/9] Clean up --- src/workflows/airqo_etl_utils/config.py | 7 ++++- .../airqo_etl_utils/data_summary_utils.py | 26 +++++++++++++++++++ .../airqo_etl_utils/weather_data_utils.py | 4 ++- src/workflows/dags/data_summary.py | 1 + 4 files changed, 36 insertions(+), 2 deletions(-) diff --git a/src/workflows/airqo_etl_utils/config.py b/src/workflows/airqo_etl_utils/config.py index 0438596f6c..dad90baf96 100644 --- a/src/workflows/airqo_etl_utils/config.py +++ b/src/workflows/airqo_etl_utils/config.py @@ -305,6 +305,8 @@ class Config: "ts": "timestamp", } + AIRBEAM_BAM_FIELD_MAPPING = {"pm2.5": "pm2_5", "pm10": "pm10", "no2": "no2"} + DATA_RESOLUTION_MAPPING = { "iqair": {"hourly": "instant", "raw": "instant", "current": "current"} } @@ -363,7 +365,10 @@ class Config: device_config_mapping = { "bam": { "field_8_cols": list(AIRQO_BAM_MAPPING_NEW.get("field8", {}).values()), - "mapping": {"airqo": AIRQO_BAM_MAPPING_NEW}, + "mapping": { + "airqo": AIRQO_BAM_MAPPING_NEW, + "airbeam": AIRBEAM_BAM_FIELD_MAPPING, + }, "other_fields_cols": [], }, "gas": { diff --git a/src/workflows/airqo_etl_utils/data_summary_utils.py b/src/workflows/airqo_etl_utils/data_summary_utils.py index 5d7b4e9c5b..4634969a0a 100644 --- a/src/workflows/airqo_etl_utils/data_summary_utils.py +++ b/src/workflows/airqo_etl_utils/data_summary_utils.py @@ -4,6 +4,32 @@ class DataSummaryUtils: @staticmethod def compute_devices_summary(data: pd.DataFrame) -> pd.DataFrame: + """ + Computes a summary of device records from the given dataset. + + This function processes a DataFrame containing device readings, grouping data + by device and daily timestamps. It calculates the number of hourly records, + calibrated and uncalibrated records, and their respective percentages. + + Args: + data (pd.DataFrame): A DataFrame containing device data with at least the columns: + - timestamp(datetime-like string): The time of the record. + - device(str): The device identifier. + - site_id(str): The site identifier. + - pm2_5_calibrated_value(float): The calibrated PM2.5 value. + + Returns: + pd.DataFrame: A summary DataFrame with aggregated daily records for each device, + containing the following columns: + - timestamp(str): Date (YYYY-MM-DD) representing the aggregation period. + - device(str): Device identifier. + - site_id(str): Site identifier. + - hourly_records(int): Total records for that device on that date. + - calibrated_records(int): Count of non-null calibrated PM2.5 values. + - uncalibrated_records(int): Count of missing PM2.5 calibrated values. + - calibrated_percentage(float): Percentage of calibrated records. + - uncalibrated_percentage(float): Percentage of uncalibrated records. + """ devices_summary = pd.DataFrame() data["timestamp"] = pd.to_datetime(data["timestamp"]) data.drop_duplicates(subset=["device", "timestamp"], inplace=True) diff --git a/src/workflows/airqo_etl_utils/weather_data_utils.py b/src/workflows/airqo_etl_utils/weather_data_utils.py index 91618c4809..06fd1ebc2d 100644 --- a/src/workflows/airqo_etl_utils/weather_data_utils.py +++ b/src/workflows/airqo_etl_utils/weather_data_utils.py @@ -178,7 +178,9 @@ def fetch_openweathermap_data_for_sites(sites: pd.DataFrame) -> pd.DataFrame: for multiple sites in parallel batches. """ - def process_batch(batch_of_coordinates: List[Dict[str, Any]]): + def process_batch( + batch_of_coordinates: List[Dict[str, Any]] + ) -> List[Dict[str, Any]]: """ Fetches weather data from OpenWeatherMap API for a given list of sites. diff --git a/src/workflows/dags/data_summary.py b/src/workflows/dags/data_summary.py index e7a439eba4..527d197b72 100644 --- a/src/workflows/dags/data_summary.py +++ b/src/workflows/dags/data_summary.py @@ -47,3 +47,4 @@ def save_summary(data: pd.DataFrame): # data_summary() +# TODO This is not being used. Will be deleted with all it's utilities once a data health analytics dashboard is in place. From 146be9cb02eb78036669ef090b03233b140a2939 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 7 Mar 2025 10:34:01 +0300 Subject: [PATCH 9/9] Clean up data recalibration to ensure only airqo low cost sensors are picked --- src/workflows/airqo_etl_utils/airqo_utils.py | 15 ++++++++------- src/workflows/airqo_etl_utils/bigquery_api.py | 11 +++++++++-- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 580f3dc20e..e01927427d 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -2,7 +2,7 @@ import ast import numpy as np import pandas as pd -from typing import List, Dict, Any, Union, Generator +from typing import List, Dict, Any, Union, Generator, Optional from .airqo_api import AirQoApi from .bigquery_api import BigQueryApi @@ -738,15 +738,17 @@ def _airqo_calibrate(data: pd.DataFrame, groupby: str) -> pd.DataFrame: @staticmethod def extract_devices_with_uncalibrated_data( - start_date, table: str = None, network: DeviceNetwork = DeviceNetwork.AIRQO + start_date: str, + table: Optional[str] = None, + network: Optional[DeviceNetwork] = DeviceNetwork.AIRQO, ) -> pd.DataFrame: """ Extracts devices with uncalibrated data for a given start date from BigQuery. Args: - start_date (str or datetime): The date for which to check missing uncalibrated data. - table (str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically. - network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO. + start_date(datetime like string): The date for which to check missing uncalibrated data. + table(str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically. + network(DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO. Returns: pd.DataFrame: A DataFrame containing the devices with missing uncalibrated data. @@ -788,9 +790,8 @@ def extract_aggregate_calibrate_raw_data( # TODO Might have to change approach to group by device_id depending on performance. for _, row in devices.iterrows(): - end_date_time = datetime.strptime(row.timestamp, "%Y-%m-%d %H:%M:%S%z") end_date_time = DateUtils.format_datetime_by_unit_str( - end_date_time, "hours_end" + row.timestamp, "hours_end" ) raw_device_data = DataUtils.extract_data_from_bigquery( DataType.RAW, diff --git a/src/workflows/airqo_etl_utils/bigquery_api.py b/src/workflows/airqo_etl_utils/bigquery_api.py index 225c8704a7..72c682311d 100644 --- a/src/workflows/airqo_etl_utils/bigquery_api.py +++ b/src/workflows/airqo_etl_utils/bigquery_api.py @@ -1048,7 +1048,10 @@ def fetch_satellite_readings( logger.info(f"Error fetching data from bigquery", {e}) def generate_missing_data_query( - self, date: str, table: str, network: DeviceNetwork + self, + date: str, + table: str, + network: Optional[DeviceNetwork] = DeviceNetwork.AIRQO, ) -> str: """ Generates a BigQuery SQL query to find missing hourly air quality data for devices. @@ -1071,7 +1074,11 @@ def generate_missing_data_query( SELECT device_id, TIMESTAMP_TRUNC(timestamp, HOUR) AS timestamp FROM `{table}` WHERE - DATE(timestamp) = '{date}' + TIMESTAMP_TRUNC(timestamp, DAY) = '{date}' + AND s1_pm2_5 IS NOT NULL + AND s2_pm2_5 IS NOT NULL + AND s1_pm10 IS NOT NULL + AND s2_pm10 IS NOT NULL AND pm2_5_calibrated_value IS NULL AND network = '{network.str}' )