Skip to content

Commit

Permalink
Merge pull request #4478 from NicholasTurner23/update/calculate_hourl…
Browse files Browse the repository at this point in the history
…y_airqualitydata_using_bigqdata

Update/calculate hourly airqualitydata using bigqdata
  • Loading branch information
Baalmart authored Feb 25, 2025
2 parents 58091d1 + 71bfda4 commit df9f410
Show file tree
Hide file tree
Showing 8 changed files with 228 additions and 66 deletions.
44 changes: 24 additions & 20 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
from .airqo_api import AirQoApi
from .bigquery_api import BigQueryApi
from .config import configuration as Config
from .constants import DeviceCategory, DeviceNetwork, Frequency, CityModel, DataType
from .constants import (
DeviceCategory,
DeviceNetwork,
Frequency,
CityModel,
DataType,
CountryModels,
)
from .data_validator import DataValidationUtils
from .date import date_to_str, DateUtils
from .ml_utils import GCSUtils
Expand Down Expand Up @@ -564,7 +571,7 @@ def map_site_ids_to_historical_data(
return data

@staticmethod
def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Calibrates air quality sensor data by applying machine learning models to adjust sensor readings.
Expand Down Expand Up @@ -593,7 +600,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
project_id = Config.GOOGLE_CLOUD_PROJECT_ID

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = sites[["site_id", "city"]]
sites = sites[["site_id", groupby]]
data = pd.merge(data, sites, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)
columns_to_fill = [
Expand Down Expand Up @@ -635,55 +642,52 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)
grouped_df = data_to_calibrate.groupby(groupby, dropna=False)
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
CityModel.DEFAULT, "pm2_5"
CountryModels.DEFAULT, "pm2_5"
),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
CityModel.DEFAULT, "pm10"
CountryModels.DEFAULT, "pm10"
),
)
for city, group in grouped_df:

available_models = [c.value for c in CountryModels]

for country, group in grouped_df:
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.str for c in CityModel]:
if country and country.lower() in available_models:
try:
current_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
city, "pm2_5"
country.lower(), "pm2_5"
),
)
current_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(city, "pm10"),
)
logger.info(
f"Got 1st models {current_rf_model} and {current_lasso_model} for {city}"
source_blob_name=Utils.get_calibration_model_path(
country.lower(), "pm10"
),
)

except Exception as e:
logger.exception(
f"Error getting custom model. Will default to generic one: {e}"
)
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
logger.info(
f"Got 2nd models {current_rf_model} and {current_lasso_model} for {city}"
)
else:
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
logger.info(
f"Got default models {current_rf_model} and {current_lasso_model} for {city}"
)

group["pm2_5_calibrated_value"] = current_rf_model.predict(
group[input_variables]
Expand Down Expand Up @@ -886,7 +890,7 @@ def extract_aggregate_calibrate_raw_data(
)
)
calibrated_data = AirQoDataUtils.calibrate_data(
data=air_weather_hourly_data
data=air_weather_hourly_data, groupby="country"
)
except Exception as e:
logger.exception(f"An error occured: {e}")
Expand Down
22 changes: 22 additions & 0 deletions src/workflows/airqo_etl_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,25 @@ def str(self) -> str:
instance.str
"""
return self.__str__()


class CountryModels(Enum):
KENYA = "kenya"
UGANDA = "uganda"
NIGERIA = "nigeria"
GHANA = "ghana"
MADAGASCAR = "madagascar"
DEFAULT = "default"

def __str__(self) -> str:
return self.name.lower()

@property
def str(self) -> str:
"""
Returns the string representation of the CountryModels.
Usage:
instance.str
"""
return self.__str__()
Loading

0 comments on commit df9f410

Please sign in to comment.