diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index c2e35ac1dd..264e502945 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -2,7 +2,7 @@ import ast import numpy as np import pandas as pd -from typing import List, Dict, Any, Union, Generator, Tuple +from typing import List, Dict, Any, Union, Generator from .airqo_api import AirQoApi from .bigquery_api import BigQueryApi @@ -287,7 +287,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: ["device_id", "site_id", "device_number", "network"] ].drop_duplicates("device_id") group_metadata.set_index("device_id", inplace=True) - numeric_columns = data.select_dtypes(include=["number"]).columns numeric_columns = numeric_columns.difference(["device_number"]) data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)] @@ -301,7 +300,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame: except Exception as e: logger.exception(f"An error occured: No data passed - {e}") aggregated = pd.DataFrame(columns=data.columns) - return aggregated @staticmethod @@ -603,6 +601,7 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: sites = sites[["site_id", groupby]] data = pd.merge(data, sites, on="site_id", how="left") data.dropna(subset=["device_id", "timestamp"], inplace=True) + columns_to_fill = [ "s1_pm2_5", "s1_pm10", @@ -611,20 +610,6 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: "temperature", "humidity", ] - - # TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only? - # Fill nas for the specified fields. - data[columns_to_fill] = data[columns_to_fill].fillna(0) - - # additional input columns for calibration - data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2) - data["avg_pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2) - data["error_pm2_5"] = np.abs(data["s1_pm2_5"] - data["s2_pm2_5"]) - data["error_pm10"] = np.abs(data["s1_pm10"] - data["s2_pm10"]) - data["pm2_5_pm10"] = data["avg_pm2_5"] - data["avg_pm10"] - data["pm2_5_pm10_mod"] = data["avg_pm2_5"] / data["avg_pm10"] - data["hour"] = data["timestamp"].dt.hour - input_variables = [ "avg_pm2_5", "avg_pm10", @@ -637,33 +622,75 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: "pm2_5_pm10_mod", ] + # TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only? + # Fill nas for the specified fields. + if "airqo" not in pd.unique(data.network): + data = data.reindex( + columns=data.columns.union(columns_to_fill, sort=False), + fill_value=np.nan, + ) + + to_calibrate = data["network"] == "airqo" + data_to_calibrate = data.loc[to_calibrate] + data_to_calibrate[columns_to_fill] = data_to_calibrate[columns_to_fill].fillna( + 0 + ) + + # additional input columns for calibration + data_to_calibrate["avg_pm2_5"] = ( + data_to_calibrate[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2) + ) + data_to_calibrate["avg_pm10"] = ( + data_to_calibrate[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2) + ) + data_to_calibrate["error_pm2_5"] = np.abs( + data_to_calibrate["s1_pm2_5"] - data_to_calibrate["s2_pm2_5"] + ) + data_to_calibrate["error_pm10"] = np.abs( + data_to_calibrate["s1_pm10"] - data_to_calibrate["s2_pm10"] + ) + data_to_calibrate["pm2_5_pm10"] = ( + data_to_calibrate["avg_pm2_5"] - data_to_calibrate["avg_pm10"] + ) + data_to_calibrate["pm2_5_pm10_mod"] = ( + data_to_calibrate["avg_pm2_5"] / data_to_calibrate["avg_pm10"] + ) + data_to_calibrate["hour"] = data_to_calibrate["timestamp"].dt.hour + + data_to_calibrate[input_variables] = data_to_calibrate[input_variables].replace( + [np.inf, -np.inf], 0 + ) + calibrate_by: Dict[str, Union[CityModels, CountryModels]] = { "city": CityModels, "country": CountryModels, } - model: Union[CityModels, CountryModels] = calibrate_by.get( + models: Union[CityModels, CountryModels] = calibrate_by.get( groupby, CountryModels ) - data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0) # Explicitly filter data to calibrate. At the moment, only calibrating on AirQo data. - to_calibrate = data["network"] == "airqo" - data_to_calibrate = data.loc[to_calibrate] + data_to_calibrate.dropna(subset=input_variables, inplace=True) grouped_df = data_to_calibrate.groupby(groupby, dropna=False) - default_rf_model = GCSUtils.get_trained_model_from_gcs( - project_name=project_id, - bucket_name=bucket, - source_blob_name=Utils.get_calibration_model_path(model.DEFAULT, "pm2_5"), - ) - default_lasso_model = GCSUtils.get_trained_model_from_gcs( - project_name=project_id, - bucket_name=bucket, - source_blob_name=Utils.get_calibration_model_path(model.DEFAULT, "pm10"), - ) + if not data_to_calibrate.empty: + default_rf_model = GCSUtils.get_trained_model_from_gcs( + project_name=project_id, + bucket_name=bucket, + source_blob_name=Utils.get_calibration_model_path( + models.DEFAULT, "pm2_5" + ), + ) + default_lasso_model = GCSUtils.get_trained_model_from_gcs( + project_name=project_id, + bucket_name=bucket, + source_blob_name=Utils.get_calibration_model_path( + models.DEFAULT, "pm10" + ), + ) - available_models = [c.value for c in model] + available_models = [c.value for c in models] for groupedby, group in grouped_df: # If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop. @@ -687,7 +714,6 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: groupedby.lower(), "pm10" ), ) - except Exception as e: logger.exception( f"Error getting custom model. Will default to generic one: {e}" @@ -734,7 +760,8 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame: "pm2_5_pm10_mod", "hour", "city", - ] + ], + errors="ignore", ) @staticmethod @@ -841,7 +868,7 @@ def extract_aggregate_calibrate_raw_data( ) ) calibrated_data = AirQoDataUtils.calibrate_data( - data=air_weather_hourly_data, groupby="country" + data=air_weather_hourly_data, groupby="city" ) except Exception as e: logger.exception(f"An error occured: {e}")