From 5eeb09b036bf2bece4c7b705eea4e57ed82a6783 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Mon, 3 Mar 2025 22:57:24 +0300 Subject: [PATCH 01/15] Delete old specific code --- src/workflows/airqo_etl_utils/kcca_utils.py | 155 -------------------- src/workflows/dags/kcca_measurements.py | 143 ------------------ 2 files changed, 298 deletions(-) delete mode 100644 src/workflows/airqo_etl_utils/kcca_utils.py delete mode 100644 src/workflows/dags/kcca_measurements.py diff --git a/src/workflows/airqo_etl_utils/kcca_utils.py b/src/workflows/airqo_etl_utils/kcca_utils.py deleted file mode 100644 index 7303d8ea20..0000000000 --- a/src/workflows/airqo_etl_utils/kcca_utils.py +++ /dev/null @@ -1,155 +0,0 @@ -import numpy as np -import pandas as pd -import requests - -from .airqo_api import AirQoApi -from .bigquery_api import BigQueryApi -from .config import configuration -from .constants import DataSource, Frequency, DeviceCategory, DeviceNetwork -from .data_validator import DataValidationUtils -from .date import date_to_str -from .utils import Utils -from .datautils import DataUtils - -import logging - -logger = logging.getLogger(__name__) - - -class KccaUtils: - @staticmethod - def query_data_from_api(start_time: str, end_time: str): - api_url = f"{configuration.CLARITY_API_BASE_URL}measurements?startTime={start_time}&endTime={end_time}&outputFrequency=hour" - - headers = { - "x-api-key": configuration.CLARITY_API_KEY, - "Accept-Encoding": "gzip", - } - try: - results = requests.get(api_url, headers=headers) - if results.status_code != 200: - print(f"{results.content}") - return [] - return results.json() - except Exception as ex: - logger.exception(ex) - return [] - - @staticmethod - def extract_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: - measurements = [] - dates = Utils.query_dates_array( - start_date_time=start_date_time, - end_date_time=end_date_time, - data_source=DataSource.CLARITY, - ) - - for start, end in dates: - range_measurements = KccaUtils.query_data_from_api(start, end) - measurements.extend(range_measurements) - - return pd.json_normalize(measurements) - - @staticmethod - def add_site_and_device_details(devices: pd.DataFrame, device_id) -> pd.Series: - """ - Retrieves site and device details for a given device ID from the provided DataFrame. - - This function filters the `devices` DataFrame to find a row matching the specified `device_id`. - If a matching device is found, it returns a pandas Series containing the `site_id` and - `device_number` associated with that device. If no matching device is found or an error occurs, - it returns a Series with None values. - - Args: - devices (pd.DataFrame): A DataFrame containing device information, including 'device_id'. - device_id: The ID of the device to search for in the DataFrame. - - Returns: - pd.Series: A Series containing 'site_id' and 'device_number' for the specified device ID, - or None values if the device is not found or an error occurs. - """ - try: - filtered_devices = devices.loc[devices.name == device_id] - if not filtered_devices.empty: - result = filtered_devices.iloc[0] - return pd.Series( - { - "site_id": result.get("site_id", None), - "device_number": result.get("device_number", None), - } - ) - except Exception as e: - logger.exception(f"An erro has occurred: {e}") - - logger.info("No matching device_id found.") - return pd.Series({"site_id": None, "device_number": None}) - - @staticmethod - def flatten_location_coordinates(coordinates: str): - try: - coordinates = coordinates.replace("[", "") - coordinates = coordinates.replace("]", "") - coordinates = coordinates.replace(" ", "") - coordinates = coordinates.split(",") - - return pd.Series({"latitude": coordinates[1], "longitude": coordinates[0]}) - except Exception as ex: - logger.exception(ex) - return pd.Series({"latitude": None, "longitude": None}) - - @staticmethod - def transform_data(data: pd.DataFrame) -> pd.DataFrame: - data.rename( - columns={ - "time": "timestamp", - "deviceCode": "device_id", - "characteristics.pm2_5ConcMass.value": "pm2_5", - "characteristics.pm2_5ConcMass.raw": "pm2_5_raw_value", - "characteristics.pm2_5ConcMass.calibratedValue": "pm2_5_calibrated_value", - "characteristics.pm10ConcMass.value": "pm10", - "characteristics.pm10ConcMass.raw": "pm10_raw_value", - "characteristics.pm10ConcMass.calibratedValue": "pm10_calibrated_value", - "characteristics.pm1ConcMass.value": "pm1", - "characteristics.pm1ConcMass.raw": "pm1_raw_value", - "characteristics.pm1ConcMass.calibratedValue": "pm1_calibrated_value", - "characteristics.no2Conc.value": "no2", - "characteristics.no2Conc.raw": "no2_raw_value", - "characteristics.no2Conc.calibratedValue": "no2_calibrated_value", - "characteristics.windSpeed.value": "wind_speed", - "characteristics.temperature.value": "temperature", - "characteristics.relHumid.value": "humidity", - "characteristics.altitude.value": "altitude", - }, - inplace=True, - ) - - data[["latitude", "longitude"]] = data["location.coordinates"].apply( - KccaUtils.flatten_location_coordinates - ) - - devices, _ = DataUtils.get_devices() - data[["site_id", "device_number"]] = data["device_id"].apply( - lambda device_id: KccaUtils.add_site_and_device_details( - devices=devices, device_id=device_id - ) - ) - - big_query_api = BigQueryApi() - required_cols = big_query_api.get_columns( - table=big_query_api.hourly_measurements_table - ) - - data = DataValidationUtils.fill_missing_columns(data=data, cols=required_cols) - data = data[required_cols] - - return DataValidationUtils.remove_outliers(data) - - @staticmethod - def process_latest_data(data: pd.DataFrame) -> pd.DataFrame: - data.loc[:, "network"] = str(DeviceNetwork.KCCA) - data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST) - return data - - @staticmethod - def transform_data_for_api(data: pd.DataFrame) -> list: - return DataUtils.process_data_for_api(data) diff --git a/src/workflows/dags/kcca_measurements.py b/src/workflows/dags/kcca_measurements.py deleted file mode 100644 index 3f6f17c9a5..0000000000 --- a/src/workflows/dags/kcca_measurements.py +++ /dev/null @@ -1,143 +0,0 @@ -from airflow.decorators import dag, task -from datetime import datetime, timedelta, timezone - -from airqo_etl_utils.workflows_custom_utils import AirflowUtils -from airflow.exceptions import AirflowFailException -from airqo_etl_utils.config import configuration as Config -from airqo_etl_utils.constants import DeviceNetwork -from airqo_etl_utils.datautils import DataUtils - - -@dag( - "KCCA-Hourly-Measurements", - schedule=None, - default_args=AirflowUtils.dag_default_configs(), - catchup=False, - tags=["kcca", "hourly"], -) -def kcca_hourly_measurements(): - import pandas as pd - - @task() - def extract(): - from airqo_etl_utils.kcca_utils import KccaUtils - from airqo_etl_utils.date import date_to_str_hours - - hour_of_day = datetime.now(timezone.utc) - timedelta(hours=1) - start_date_time = date_to_str_hours(hour_of_day) - end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z") - - return KccaUtils.extract_data( - start_date_time=start_date_time, end_date_time=end_date_time - ) - - @task() - def transform(data: pd.DataFrame): - from airqo_etl_utils.kcca_utils import KccaUtils - - return KccaUtils.transform_data(data) - - @task() - def send_to_api(data: pd.DataFrame): - from airqo_etl_utils.kcca_utils import KccaUtils - from airqo_etl_utils.airqo_api import AirQoApi - - kcca_data = KccaUtils.transform_data_for_api(data) - - airqo_api = AirQoApi() - airqo_api.save_events(measurements=kcca_data) - - @task() - def send_to_message_broker(data: pd.DataFrame, **kwargs): - from airqo_etl_utils.message_broker_utils import MessageBrokerUtils - - data = DataUtils.process_data_for_message_broker( - data=data, - ) - - if not data: - raise AirflowFailException( - "Processing for message broker failed. Please check if kafka is up and running." - ) - - MessageBrokerUtils.update_hourly_data_topic(data=data) - - @task() - def send_to_bigquery(data: pd.DataFrame): - from airqo_etl_utils.data_validator import DataValidationUtils - from airqo_etl_utils.bigquery_api import BigQueryApi - - big_query_api = BigQueryApi() - table = big_query_api.hourly_measurements_table - data["network"] = DeviceNetwork.KCCA.str - data = DataValidationUtils.process_for_big_query(dataframe=data, table=table) - big_query_api.load_data( - dataframe=data, - table=table, - ) - - @task() - def update_latest_data_topic(data: pd.DataFrame): - from airqo_etl_utils.kcca_utils import KccaUtils - from airqo_etl_utils.message_broker_utils import MessageBrokerUtils - - data = KccaUtils.process_latest_data(data=data) - - MessageBrokerUtils.update_hourly_data_topic(data=data) - - extracted_data = extract() - transformed_data = transform(extracted_data) - send_to_message_broker(transformed_data) - send_to_api(transformed_data) - update_latest_data_topic(transformed_data) - send_to_bigquery(transformed_data) - - -@dag( - "Kcca-Historical-Hourly-Measurements", - schedule=None, - default_args=AirflowUtils.dag_default_configs(), - catchup=False, - tags=["kcca", "hourly", "historical"], -) -def kcca_historical_hourly_measurements(): - import pandas as pd - - @task() - def extract(**kwargs): - from airqo_etl_utils.date import DateUtils - from airqo_etl_utils.kcca_utils import KccaUtils - - start_date_time, end_date_time = DateUtils.get_dag_date_time_values(**kwargs) - - return KccaUtils.extract_data( - start_date_time=start_date_time, end_date_time=end_date_time - ) - - @task() - def transform(data: pd.DataFrame): - from airqo_etl_utils.kcca_utils import KccaUtils - - return KccaUtils.transform_data(data) - - @task() - def send_to_bigquery(data: pd.DataFrame): - from airqo_etl_utils.data_validator import DataValidationUtils - from airqo_etl_utils.bigquery_api import BigQueryApi - - big_query_api = BigQueryApi() - table = big_query_api.hourly_measurements_table - data["network"] = DeviceNetwork.KCCA.str - data = DataValidationUtils.process_for_big_query(dataframe=data, table=table) - big_query_api.load_data( - dataframe=data, - table=table, - ) - - extracted_data = extract() - transformed_data = transform(extracted_data) - send_to_bigquery(transformed_data) - - -kcca_hourly_measurements() -kcca_historical_hourly_measurements() From a7e184695914939ae43a513c15fa6185de83948a Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Mon, 3 Mar 2025 22:58:15 +0300 Subject: [PATCH 02/15] Add template code for clarity devices --- src/workflows/airqo_etl_utils/datautils.py | 125 +++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 0222b00745..4ccb2dd412 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -1004,3 +1004,128 @@ def process_data_for_message_broker( logger.exception(f"An error occured: {e}") return None return data + + # Clarity + def _flatten_location_coordinates_clarity(coordinates: str) -> pd.Series: + """ + Extracts latitude and longitude from a coordinate string. + + The function expects a string representation of coordinates in the format "[longitude, latitude]". It removes square brackets and spaces, splits + the values, and returns them as a Pandas Series. + + Args: + coordinates(str): A string containing coordinates in the format "[longitude, latitude]". + + Returns: + pd.Series: A Pandas Series with 'latitude' and 'longitude' as keys. Returns None for both values if an error occurs. + + Example: + >>> _flatten_location_coordinates("[-73.935242, 40.730610]") + latitude 40.730610 + longitude -73.935242 + dtype: object + """ + + try: + coords = coordinates.strip("[] ").split(",") + return pd.Series( + {"latitude": coords[1].strip(), "longitude": coords[0].strip()} + ) + except Exception as ex: + logger.exception("Error parsing coordinates: %s", ex) + return pd.Series({"latitude": None, "longitude": None}) + + def _transform_clarity_data(data: pd.DataFrame) -> pd.DataFrame: + """ + Transforms Clarity API data by renaming columns, extracting location details, + mapping devices, and ensuring required columns are present. + + Args: + data(pd.DataFrame): The input data frame containing raw Clarity API data. + + Returns: + pd.DataFrame: The transformed data frame with cleaned and formatted data. + + Processing Steps: + 1. Renames columns for consistency. + 2. Extracts latitude and longitude from the `location.coordinates` field. + 3. Adds site and device details from the `device_id` field. + 4. Retrieves required columns from BigQuery and fills missing ones. + 5. Removes outliers before returning the final dataset. + """ + data.rename( + columns={ + "time": "timestamp", + "deviceCode": "device_id", + "characteristics.pm2_5ConcMass.value": "pm2_5", + "characteristics.pm2_5ConcMass.raw": "pm2_5_raw_value", + "characteristics.pm2_5ConcMass.calibratedValue": "pm2_5_calibrated_value", + "characteristics.pm10ConcMass.value": "pm10", + "characteristics.pm10ConcMass.raw": "pm10_raw_value", + "characteristics.pm10ConcMass.calibratedValue": "pm10_calibrated_value", + "characteristics.pm1ConcMass.value": "pm1", + "characteristics.pm1ConcMass.raw": "pm1_raw_value", + "characteristics.pm1ConcMass.calibratedValue": "pm1_calibrated_value", + "characteristics.no2Conc.value": "no2", + "characteristics.no2Conc.raw": "no2_raw_value", + "characteristics.no2Conc.calibratedValue": "no2_calibrated_value", + "characteristics.windSpeed.value": "wind_speed", + "characteristics.temperature.value": "temperature", + "characteristics.relHumid.value": "humidity", + "characteristics.altitude.value": "altitude", + }, + inplace=True, + ) + + data[["latitude", "longitude"]] = data["location.coordinates"].apply( + DataUtils._flatten_location_coordinates + ) + + devices, _ = DataUtils.get_devices() + data[["site_id", "device_number"]] = data["device_id"].apply( + lambda device_id: DataUtils._add_site_and_device_details( + devices=devices, device_id=device_id + ) + ) + + big_query_api = BigQueryApi() + required_cols = big_query_api.get_columns( + table=big_query_api.hourly_measurements_table + ) + data = DataValidationUtils.fill_missing_columns(data=data, cols=required_cols) + data = data[required_cols] + + return DataValidationUtils.remove_outliers(data) + + def _add_site_and_device_details(devices: pd.DataFrame, device_id) -> pd.Series: + """ + Retrieves site and device details for a given device ID from the provided DataFrame. + + This function filters the `devices` DataFrame to find a row matching the specified `device_id`. + If a matching device is found, it returns a pandas Series containing the `site_id` and + `device_number` associated with that device. If no matching device is found or an error occurs, + it returns a Series with None values. + + Args: + devices (pd.DataFrame): A DataFrame containing device information, including 'device_id'. + device_id: The ID of the device to search for in the DataFrame. + + Returns: + pd.Series: A Series containing 'site_id' and 'device_number' for the specified device ID, + or None values if the device is not found or an error occurs. + """ + try: + filtered_devices = devices.loc[devices.name == device_id] + if not filtered_devices.empty: + result = filtered_devices.iloc[0] + return pd.Series( + { + "site_id": result.get("site_id", None), + "device_number": result.get("device_number", None), + } + ) + except Exception as e: + logger.exception(f"An erro has occurred: {e}") + + logger.info("No matching device_id found.") + return pd.Series({"site_id": None, "device_number": None}) From 95f507f2daf6480cb40ed27433863d5069666f67 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:18:41 +0300 Subject: [PATCH 03/15] Update auth service staging image tag to stage-ef828b9a-1741087033 --- k8s/auth-service/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-stage.yaml b/k8s/auth-service/values-stage.yaml index 9155023364..4929c8e90b 100644 --- a/k8s/auth-service/values-stage.yaml +++ b/k8s/auth-service/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api - tag: stage-6410a447-1741021914 + tag: stage-ef828b9a-1741087033 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 82fcf251399b87f961f618e2d896adf54d45298a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:19:38 +0300 Subject: [PATCH 04/15] Update data mgt production image tag to prod-cc4c9655-1741087107 --- k8s/data-mgt/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/data-mgt/values-prod.yaml b/k8s/data-mgt/values-prod.yaml index 13ffa77b90..acaf07bb47 100644 --- a/k8s/data-mgt/values-prod.yaml +++ b/k8s/data-mgt/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api - tag: prod-a8923ac0-1741034129 + tag: prod-cc4c9655-1741087107 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 061f40e9a31e81247b5f923e0e560c58cf844e84 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:20:08 +0300 Subject: [PATCH 05/15] Update auth service production image tag to prod-cc4c9655-1741087107 --- k8s/auth-service/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index 145c96e010..5a244e8fcd 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-a8923ac0-1741034129 + tag: prod-cc4c9655-1741087107 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 83a595606a7739165de26f610fc74e5fc621a4dd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:20:17 +0300 Subject: [PATCH 06/15] Update device registry production image tag to prod-cc4c9655-1741087107 --- k8s/device-registry/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index a5b205e4f7..bebe463a3d 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-a8923ac0-1741034129 + tag: prod-cc4c9655-1741087107 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 0731ef0aed75644348eff9ff65a468b08f99465b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:20:53 +0300 Subject: [PATCH 07/15] Update workflows prod image tag to prod-cc4c9655-1741087107 --- k8s/workflows/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index d1b90b4b8c..86ff2bf59a 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-a8923ac0-1741034129 + tag: prod-cc4c9655-1741087107 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 71f26f2f1d273dbf161286e23cfa8ddc259faf65 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 4 Mar 2025 14:22:39 +0300 Subject: [PATCH 08/15] Update predict production image tag to prod-cc4c9655-1741087107 --- k8s/predict/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/predict/values-prod.yaml b/k8s/predict/values-prod.yaml index 56a9e26d1b..d52ff1c008 100644 --- a/k8s/predict/values-prod.yaml +++ b/k8s/predict/values-prod.yaml @@ -7,7 +7,7 @@ images: predictJob: eu.gcr.io/airqo-250220/airqo-predict-job trainJob: eu.gcr.io/airqo-250220/airqo-train-job predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality - tag: prod-a8923ac0-1741034129 + tag: prod-cc4c9655-1741087107 api: name: airqo-prediction-api label: prediction-api From 87557e32fd140dd528c3e95152665a1ddf1be7d2 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 4 Mar 2025 15:52:07 +0300 Subject: [PATCH 09/15] Clean up and optimize get devices --- src/workflows/airqo_etl_utils/datautils.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 4ccb2dd412..e2d83f9a6c 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -66,7 +66,27 @@ def get_devices( local_file_path, MetaDataType.DEVICES.str ) if not devices.empty: - devices["device_number"] = devices["device_number"].fillna(-1) + devices["device_number"] = ( + devices["device_number"].fillna(-1).astype(int) + ) + + if device_category: + devices = devices.loc[ + devices.device_category == device_category.str + ] + + if device_network: + devices = devices.loc[devices.network == device_network.str] + + keys = dict( + zip( + devices.loc[ + devices.network == "airqo", "device_number" + ].to_numpy(), + devices.loc[devices.network == "airqo", "key"].to_numpy(), + ) + ) + except Exception as e: logger.exception(f"No devices currently cached: {e}") From 20c3076fa7bd9bcccc055b10e5b64ce4afa9ca11 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 5 Mar 2025 12:39:47 +0300 Subject: [PATCH 10/15] Make data calibration modular --- src/workflows/airqo_etl_utils/airqo_utils.py | 215 ++++++++----------- 1 file changed, 93 insertions(+), 122 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 264e502945..ea6607acb3 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -570,6 +570,58 @@ def map_site_ids_to_historical_data( @staticmethod def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: + """ + Merges calibrated data back into the original dataset and computes raw PM values after calibration. + + Args: + data (pd.DataFrame): The raw sensor data. + groupby (str): The column to group by for model selection. + + Returns: + pd.DataFrame: The original dataset with calibrated PM2.5 and PM10 values. + """ + + data["timestamp"] = pd.to_datetime(data["timestamp"]) + + to_calibrate = data["network"] == "xxxx" + + calibrated_data = AirQoDataUtils._airqo_calibrate( + data.loc[to_calibrate], groupby + ) + + data.loc[ + to_calibrate, ["pm2_5_calibrated_value", "pm10_calibrated_value"] + ] = calibrated_data + + data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1) + data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1) + + data = data.assign( + pm2_5_calibrated_value=data.get("pm2_5_calibrated_value", np.nan), + pm10_calibrated_value=data.get("pm10_calibrated_value", np.nan), + ) + + data.loc[to_calibrate, "pm2_5"] = data.loc[ + to_calibrate, "pm2_5_calibrated_value" + ].fillna(data.loc[to_calibrate, "pm2_5_raw_value"]) + data.loc[to_calibrate, "pm10"] = data.loc[ + to_calibrate, "pm10_calibrated_value" + ].fillna(data.loc[to_calibrate, "pm10_raw_value"]) + + return data.drop( + columns=[ + "avg_pm2_5", + "avg_pm10", + "error_pm2_5", + "error_pm10", + "pm2_5_pm10", + "pm2_5_pm10_mod", + "hour", + ], + errors="ignore", + ) + + def _airqo_calibrate(data: pd.DataFrame, groupby: str) -> pd.DataFrame: """ Calibrates air quality sensor data by applying machine learning models to adjust sensor readings. @@ -587,29 +639,26 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: data (pd.DataFrame): The raw air quality sensor data. Returns: - pd.DataFrame: The calibrated dataset with additional processed fields. + pd.DataFrame: A DataFrame with calibrated PM2.5 and PM10 values. """ + bucket = Config.FORECAST_MODELS_BUCKET + project_id = Config.GOOGLE_CLOUD_PROJECT_ID + calibrate_by: Dict[str, Union[CityModels, CountryModels]] = { + "city": CityModels, + "country": CountryModels, + } + + models: Union[CityModels, CountryModels] = calibrate_by.get( + groupby, CountryModels + ) sites = DataUtils.get_sites() if sites.empty: raise RuntimeError("Failed to fetch sites data from the cache/API") - bucket = Config.FORECAST_MODELS_BUCKET - project_id = Config.GOOGLE_CLOUD_PROJECT_ID - - data["timestamp"] = pd.to_datetime(data["timestamp"]) sites = sites[["site_id", groupby]] data = pd.merge(data, sites, on="site_id", how="left") - data.dropna(subset=["device_id", "timestamp"], inplace=True) - columns_to_fill = [ - "s1_pm2_5", - "s1_pm10", - "s2_pm2_5", - "s2_pm10", - "temperature", - "humidity", - ] input_variables = [ "avg_pm2_5", "avg_pm10", @@ -622,78 +671,34 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: "pm2_5_pm10_mod", ] - # TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only? - # Fill nas for the specified fields. - if "airqo" not in pd.unique(data.network): - data = data.reindex( - columns=data.columns.union(columns_to_fill, sort=False), - fill_value=np.nan, - ) - - to_calibrate = data["network"] == "airqo" - data_to_calibrate = data.loc[to_calibrate] - data_to_calibrate[columns_to_fill] = data_to_calibrate[columns_to_fill].fillna( - 0 - ) - # additional input columns for calibration - data_to_calibrate["avg_pm2_5"] = ( - data_to_calibrate[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2) - ) - data_to_calibrate["avg_pm10"] = ( - data_to_calibrate[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2) - ) - data_to_calibrate["error_pm2_5"] = np.abs( - data_to_calibrate["s1_pm2_5"] - data_to_calibrate["s2_pm2_5"] - ) - data_to_calibrate["error_pm10"] = np.abs( - data_to_calibrate["s1_pm10"] - data_to_calibrate["s2_pm10"] - ) - data_to_calibrate["pm2_5_pm10"] = ( - data_to_calibrate["avg_pm2_5"] - data_to_calibrate["avg_pm10"] - ) - data_to_calibrate["pm2_5_pm10_mod"] = ( - data_to_calibrate["avg_pm2_5"] / data_to_calibrate["avg_pm10"] - ) - data_to_calibrate["hour"] = data_to_calibrate["timestamp"].dt.hour - - data_to_calibrate[input_variables] = data_to_calibrate[input_variables].replace( - [np.inf, -np.inf], 0 + data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2) + data["avg_pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2) + data["error_pm2_5"] = np.abs(data["s1_pm2_5"] - data["s2_pm2_5"]) + data["error_pm10"] = np.abs(data["s1_pm10"] - data["s2_pm10"]) + data["pm2_5_pm10"] = data["avg_pm2_5"] - data["avg_pm10"] + data["pm2_5_pm10_mod"] = data["avg_pm2_5"] / data["avg_pm10"] + data["hour"] = data["timestamp"].dt.hour + + data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0) + + default_rf_model = GCSUtils.get_trained_model_from_gcs( + project_name=project_id, + bucket_name=bucket, + source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm2_5"), ) - - calibrate_by: Dict[str, Union[CityModels, CountryModels]] = { - "city": CityModels, - "country": CountryModels, - } - - models: Union[CityModels, CountryModels] = calibrate_by.get( - groupby, CountryModels + default_lasso_model = GCSUtils.get_trained_model_from_gcs( + project_name=project_id, + bucket_name=bucket, + source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm10"), ) - - # Explicitly filter data to calibrate. At the moment, only calibrating on AirQo data. - - data_to_calibrate.dropna(subset=input_variables, inplace=True) - grouped_df = data_to_calibrate.groupby(groupby, dropna=False) - if not data_to_calibrate.empty: - default_rf_model = GCSUtils.get_trained_model_from_gcs( - project_name=project_id, - bucket_name=bucket, - source_blob_name=Utils.get_calibration_model_path( - models.DEFAULT, "pm2_5" - ), - ) - default_lasso_model = GCSUtils.get_trained_model_from_gcs( - project_name=project_id, - bucket_name=bucket, - source_blob_name=Utils.get_calibration_model_path( - models.DEFAULT, "pm10" - ), - ) - available_models = [c.value for c in models] - for groupedby, group in grouped_df: - # If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop. + calibrated_data = pd.DataFrame(index=data.index) + + for groupedby, group in data.groupby(groupby, dropna=False): + current_rf_model = default_rf_model + current_lasso_model = default_lasso_model if ( groupedby and not pd.isna(groupedby) @@ -720,49 +725,15 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: ) current_rf_model = default_rf_model current_lasso_model = default_lasso_model - else: - current_rf_model = default_rf_model - current_lasso_model = default_lasso_model - - group["pm2_5_calibrated_value"] = current_rf_model.predict( - group[input_variables] - ) - group["pm10_calibrated_value"] = current_lasso_model.predict( - group[input_variables] - ) - data.loc[ - group.index, ["pm2_5_calibrated_value", "pm10_calibrated_value"] - ] = group[["pm2_5_calibrated_value", "pm10_calibrated_value"]] - - # Compute raw pm2_5 and pm10 values. - data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1) - data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1) - # Create calibrated columns if they don't exist - data["pm2_5_calibrated_value"] = data.get("pm2_5_calibrated_value", np.nan) - data["pm10_calibrated_value"] = data.get("pm10_calibrated_value", np.nan) - - # Assign calibrated values, falling back to raw values when missing - data.loc[to_calibrate, "pm2_5"] = data.loc[ - to_calibrate, "pm2_5_calibrated_value" - ].fillna(data.loc[to_calibrate, "pm2_5_raw_value"]) - data.loc[to_calibrate, "pm10"] = data.loc[ - to_calibrate, "pm10_calibrated_value" - ].fillna(data.loc[to_calibrate, "pm10_raw_value"]) + calibrated_data.loc[ + group.index, "pm2_5_calibrated_value" + ] = current_rf_model.predict(group[input_variables]) + calibrated_data.loc[ + group.index, "pm10_calibrated_value" + ] = current_lasso_model.predict(group[input_variables]) - return data.drop( - columns=[ - "avg_pm2_5", - "avg_pm10", - "error_pm2_5", - "error_pm10", - "pm2_5_pm10", - "pm2_5_pm10_mod", - "hour", - "city", - ], - errors="ignore", - ) + return calibrated_data @staticmethod def extract_devices_with_uncalibrated_data( From 57dcbab7e985b61fcc81cf32a96ba6fbb68866b9 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 5 Mar 2025 12:40:31 +0300 Subject: [PATCH 11/15] Use cached devices data --- src/workflows/airqo_etl_utils/airnow_utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index dd264f9208..2f542b6227 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -8,7 +8,7 @@ from .data_validator import DataValidationUtils from .date import str_to_date, date_to_str from .utils import Utils - +from .datautils import DataUtils from .config import configuration import logging @@ -71,6 +71,9 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: devices = AirQoApi().get_devices_by_network( device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM ) + devices, _ = DataUtils.get_devices( + device_category=DeviceCategory.BAM, device_network=DeviceNetwork.METONE + ) bam_data = pd.DataFrame() if not devices: From 2687de70293f8a62bb50c55acc9db41a147a34c6 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 5 Mar 2025 12:41:56 +0300 Subject: [PATCH 12/15] Drop empty values --- src/workflows/airqo_etl_utils/datautils.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index e2d83f9a6c..28da18d2b4 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -208,7 +208,10 @@ def extract_devices_data( is_airqo_network, "vapor_pressure" ].apply(DataValidationUtils.convert_pressure_values) - return devices_data + return devices_data.dropna( + subset=["pm2_5", "pm10", "s1_pm2_5", "s2_pm2_5", "s1_pm10", "s2_pm10"], + how="all", + ) @staticmethod def load_cached_data(local_file_path: str, file_name: str) -> pd.DataFrame: @@ -805,7 +808,7 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: A pandas DataFrame object containing hourly averages of data. """ - data["timestamp"] = pd.to_datetime(data["timestamp"]) + data["timestamp"] = pd.to_datetime(data["timestamp"], format="mixed") group_metadata = ( data[["device_id", "site_id", "device_number", "network"]] @@ -821,7 +824,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: .reset_index() ) aggregated = aggregated.merge(group_metadata, on="device_id", how="left") - return aggregated @staticmethod From 346e20c7783b0c4da984c9fbaf49ae10598cd58d Mon Sep 17 00:00:00 2001 From: Nicholas Bob Date: Wed, 5 Mar 2025 12:56:17 +0300 Subject: [PATCH 13/15] Update airnow_utils.py Clean up --- src/workflows/airqo_etl_utils/airnow_utils.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airnow_utils.py b/src/workflows/airqo_etl_utils/airnow_utils.py index 2f542b6227..e11b71767b 100644 --- a/src/workflows/airqo_etl_utils/airnow_utils.py +++ b/src/workflows/airqo_etl_utils/airnow_utils.py @@ -68,9 +68,6 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame: Raises: ValueError: If no devices are found for the BAM network or if no data is returned for the specified date range. """ - devices = AirQoApi().get_devices_by_network( - device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM - ) devices, _ = DataUtils.get_devices( device_category=DeviceCategory.BAM, device_network=DeviceNetwork.METONE ) From 01464120411a2c2ed99cb108c8f250893f4a4944 Mon Sep 17 00:00:00 2001 From: Nicholas Bob Date: Wed, 5 Mar 2025 12:57:46 +0300 Subject: [PATCH 14/15] Update airqo_utils.py Clean up --- src/workflows/airqo_etl_utils/airqo_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index ea6607acb3..bd3eb7e2fc 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -583,7 +583,7 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: data["timestamp"] = pd.to_datetime(data["timestamp"]) - to_calibrate = data["network"] == "xxxx" + to_calibrate = data["network"] == "airqo" calibrated_data = AirQoDataUtils._airqo_calibrate( data.loc[to_calibrate], groupby @@ -745,7 +745,7 @@ def extract_devices_with_uncalibrated_data( Args: start_date (str or datetime): The date for which to check missing uncalibrated data. table (str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically. - network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.xxxx. + network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO. Returns: pd.DataFrame: A DataFrame containing the devices with missing uncalibrated data. From b4941b428eb21214c6c8a9c1fd83419cdf2a682d Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 5 Mar 2025 13:01:49 +0300 Subject: [PATCH 15/15] Clean up --- src/workflows/airqo_etl_utils/airqo_utils.py | 6 +++--- src/workflows/airqo_etl_utils/datautils.py | 8 ++++++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index bd3eb7e2fc..910fea7ae1 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -262,7 +262,7 @@ def merge_aggregated_mobile_devices_data_and_weather_data( @staticmethod def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame: data["timestamp"] = pd.to_datetime(data["timestamp"]) - data["network"] = "airqo" + data["network"] = DeviceNetwork.AIRQO.str big_query_api = BigQueryApi() cols = big_query_api.get_columns( table=big_query_api.airqo_mobile_measurements_table @@ -468,7 +468,7 @@ def extract_devices_deployment_logs() -> pd.DataFrame: for _, device in devices.iterrows(): try: maintenance_logs = airqo_api.get_maintenance_logs( - network=device.get("network", "airqo"), + network=device.get("network", DeviceNetwork.AIRQO.str), device=device.get("name", None), activity_type="deployment", ) @@ -583,7 +583,7 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: data["timestamp"] = pd.to_datetime(data["timestamp"]) - to_calibrate = data["network"] == "airqo" + to_calibrate = data["network"] == DeviceNetwork.AIRQO.str calibrated_data = AirQoDataUtils._airqo_calibrate( data.loc[to_calibrate], groupby diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 28da18d2b4..f03aa12574 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -269,7 +269,11 @@ def _extract_device_api_data( api_data = [] data_source_api = DataSourcesApis() - if device_number and not np.isnan(device_number) and network == "airqo": + if ( + device_number + and not np.isnan(device_number) + and network == DeviceNetwork.AIRQO.str + ): for start, end in dates: data_, meta_data, data_available = data_source_api.thingspeak( device_number=int(device_number), @@ -282,7 +286,7 @@ def _extract_device_api_data( if api_data: mapping = config["mapping"][network] return DataUtils.map_and_extract_data(mapping, api_data), meta_data - elif network == "iqair": + elif network == DeviceNetwork.IQAIR.str: mapping = config["mapping"][network] try: iqair_data = data_source_api.iqair(device, resolution=resolution)