Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/calculate hourly airqualitydata using bigqdata #4478

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@
from .airqo_api import AirQoApi
from .bigquery_api import BigQueryApi
from .config import configuration as Config
from .constants import DeviceCategory, DeviceNetwork, Frequency, CityModel, DataType
from .constants import (
DeviceCategory,
DeviceNetwork,
Frequency,
CityModel,
DataType,
CountryModels,
)
from .data_validator import DataValidationUtils
from .date import date_to_str, DateUtils
from .ml_utils import GCSUtils
Expand Down Expand Up @@ -564,7 +571,7 @@ def map_site_ids_to_historical_data(
return data

@staticmethod
def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Calibrates air quality sensor data by applying machine learning models to adjust sensor readings.

Expand Down Expand Up @@ -593,7 +600,7 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
project_id = Config.GOOGLE_CLOUD_PROJECT_ID

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = sites[["site_id", "city"]]
sites = sites[["site_id", groupby]]
data = pd.merge(data, sites, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)
columns_to_fill = [
Expand Down Expand Up @@ -635,55 +642,52 @@ def calibrate_data(data: pd.DataFrame) -> pd.DataFrame:
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby("city", dropna=False)
grouped_df = data_to_calibrate.groupby(groupby, dropna=False)
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
CityModel.DEFAULT, "pm2_5"
CountryModels.DEFAULT, "pm2_5"
),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
CityModel.DEFAULT, "pm10"
CountryModels.DEFAULT, "pm10"
),
)
for city, group in grouped_df:

available_models = [c.value for c in CountryModels]

for country, group in grouped_df:
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
if str(city).lower() in [c.str for c in CityModel]:
if country and country.lower() in available_models:
try:
current_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
city, "pm2_5"
country.lower(), "pm2_5"
),
)
current_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(city, "pm10"),
)
logger.info(
f"Got 1st models {current_rf_model} and {current_lasso_model} for {city}"
source_blob_name=Utils.get_calibration_model_path(
country.lower(), "pm10"
),
)

except Exception as e:
logger.exception(
f"Error getting custom model. Will default to generic one: {e}"
)
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
logger.info(
f"Got 2nd models {current_rf_model} and {current_lasso_model} for {city}"
)
else:
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
logger.info(
f"Got default models {current_rf_model} and {current_lasso_model} for {city}"
)

group["pm2_5_calibrated_value"] = current_rf_model.predict(
group[input_variables]
Expand Down Expand Up @@ -886,7 +890,7 @@ def extract_aggregate_calibrate_raw_data(
)
)
calibrated_data = AirQoDataUtils.calibrate_data(
data=air_weather_hourly_data
data=air_weather_hourly_data, groupby="country"
)
except Exception as e:
logger.exception(f"An error occured: {e}")
Expand Down
22 changes: 22 additions & 0 deletions src/workflows/airqo_etl_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,25 @@ def str(self) -> str:
instance.str
"""
return self.__str__()


class CountryModels(Enum):
KENYA = "kenya"
UGANDA = "uganda"
NIGERIA = "nigeria"
GHANA = "ghana"
MADAGASCAR = "madagascar"
DEFAULT = "default"

def __str__(self) -> str:
return self.name.lower()

@property
def str(self) -> str:
"""
Returns the string representation of the CountryModels.
Usage:
instance.str
"""
return self.__str__()
Loading
Loading