Skip to content

Commit

Permalink
Optimize calibration
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasTurner23 committed Feb 27, 2025
1 parent 68e5f32 commit 17753de
Showing 1 changed file with 62 additions and 35 deletions.
97 changes: 62 additions & 35 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import ast
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Union, Generator, Tuple
from typing import List, Dict, Any, Union, Generator

from .airqo_api import AirQoApi
from .bigquery_api import BigQueryApi
Expand Down Expand Up @@ -287,7 +287,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
["device_id", "site_id", "device_number", "network"]
].drop_duplicates("device_id")
group_metadata.set_index("device_id", inplace=True)

numeric_columns = data.select_dtypes(include=["number"]).columns
numeric_columns = numeric_columns.difference(["device_number"])
data_for_aggregation = data[["timestamp", "device_id"] + list(numeric_columns)]
Expand All @@ -301,7 +300,6 @@ def aggregate_low_cost_sensors_data(data: pd.DataFrame) -> pd.DataFrame:
except Exception as e:
logger.exception(f"An error occured: No data passed - {e}")
aggregated = pd.DataFrame(columns=data.columns)

return aggregated

@staticmethod
Expand Down Expand Up @@ -603,6 +601,7 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
sites = sites[["site_id", groupby]]
data = pd.merge(data, sites, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
Expand All @@ -611,20 +610,6 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"temperature",
"humidity",
]

# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# Fill nas for the specified fields.
data[columns_to_fill] = data[columns_to_fill].fillna(0)

# additional input columns for calibration
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
data["avg_pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
data["error_pm2_5"] = np.abs(data["s1_pm2_5"] - data["s2_pm2_5"])
data["error_pm10"] = np.abs(data["s1_pm10"] - data["s2_pm10"])
data["pm2_5_pm10"] = data["avg_pm2_5"] - data["avg_pm10"]
data["pm2_5_pm10_mod"] = data["avg_pm2_5"] / data["avg_pm10"]
data["hour"] = data["timestamp"].dt.hour

input_variables = [
"avg_pm2_5",
"avg_pm10",
Expand All @@ -637,33 +622,75 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"pm2_5_pm10_mod",
]

# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# Fill nas for the specified fields.
if "airqo" not in pd.unique(data.network):
data = data.reindex(
columns=data.columns.union(columns_to_fill, sort=False),
fill_value=np.nan,
)

to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate[columns_to_fill] = data_to_calibrate[columns_to_fill].fillna(
0
)

# additional input columns for calibration
data_to_calibrate["avg_pm2_5"] = (
data_to_calibrate[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
)
data_to_calibrate["avg_pm10"] = (
data_to_calibrate[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
)
data_to_calibrate["error_pm2_5"] = np.abs(
data_to_calibrate["s1_pm2_5"] - data_to_calibrate["s2_pm2_5"]
)
data_to_calibrate["error_pm10"] = np.abs(
data_to_calibrate["s1_pm10"] - data_to_calibrate["s2_pm10"]
)
data_to_calibrate["pm2_5_pm10"] = (
data_to_calibrate["avg_pm2_5"] - data_to_calibrate["avg_pm10"]
)
data_to_calibrate["pm2_5_pm10_mod"] = (
data_to_calibrate["avg_pm2_5"] / data_to_calibrate["avg_pm10"]
)
data_to_calibrate["hour"] = data_to_calibrate["timestamp"].dt.hour

data_to_calibrate[input_variables] = data_to_calibrate[input_variables].replace(
[np.inf, -np.inf], 0
)

calibrate_by: Dict[str, Union[CityModels, CountryModels]] = {
"city": CityModels,
"country": CountryModels,
}

model: Union[CityModels, CountryModels] = calibrate_by.get(
models: Union[CityModels, CountryModels] = calibrate_by.get(
groupby, CountryModels
)
data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)

# Explicitly filter data to calibrate. At the moment, only calibrating on AirQo data.
to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]

data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby(groupby, dropna=False)
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(model.DEFAULT, "pm2_5"),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(model.DEFAULT, "pm10"),
)
if not data_to_calibrate.empty:
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm2_5"
),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm10"
),
)

available_models = [c.value for c in model]
available_models = [c.value for c in models]

for groupedby, group in grouped_df:
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
Expand All @@ -687,7 +714,6 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
groupedby.lower(), "pm10"
),
)

except Exception as e:
logger.exception(
f"Error getting custom model. Will default to generic one: {e}"
Expand Down Expand Up @@ -734,7 +760,8 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"pm2_5_pm10_mod",
"hour",
"city",
]
],
errors="ignore",
)

@staticmethod
Expand Down Expand Up @@ -841,7 +868,7 @@ def extract_aggregate_calibrate_raw_data(
)
)
calibrated_data = AirQoDataUtils.calibrate_data(
data=air_weather_hourly_data, groupby="country"
data=air_weather_hourly_data, groupby="city"
)
except Exception as e:
logger.exception(f"An error occured: {e}")
Expand Down

0 comments on commit 17753de

Please sign in to comment.