Skip to content

Commit

Permalink
Merge pull request #4525 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Mar 5, 2025
2 parents cc4c965 + 281249c commit abb3251
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 438 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api
tag: stage-6410a447-1741021914
tag: stage-ef828b9a-1741087033
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/data-mgt/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
6 changes: 3 additions & 3 deletions src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .data_validator import DataValidationUtils
from .date import str_to_date, date_to_str
from .utils import Utils

from .datautils import DataUtils
from .config import configuration
import logging

Expand Down Expand Up @@ -68,8 +68,8 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
Raises:
ValueError: If no devices are found for the BAM network or if no data is returned for the specified date range.
"""
devices = AirQoApi().get_devices_by_network(
device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM
devices, _ = DataUtils.get_devices(
device_category=DeviceCategory.BAM, device_network=DeviceNetwork.METONE
)
bam_data = pd.DataFrame()

Expand Down
221 changes: 96 additions & 125 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def merge_aggregated_mobile_devices_data_and_weather_data(
@staticmethod
def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["network"] = "airqo"
data["network"] = DeviceNetwork.AIRQO.str
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(
table=big_query_api.airqo_mobile_measurements_table
Expand Down Expand Up @@ -468,7 +468,7 @@ def extract_devices_deployment_logs() -> pd.DataFrame:
for _, device in devices.iterrows():
try:
maintenance_logs = airqo_api.get_maintenance_logs(
network=device.get("network", "airqo"),
network=device.get("network", DeviceNetwork.AIRQO.str),
device=device.get("name", None),
activity_type="deployment",
)
Expand Down Expand Up @@ -570,6 +570,58 @@ def map_site_ids_to_historical_data(

@staticmethod
def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Merges calibrated data back into the original dataset and computes raw PM values after calibration.
Args:
data (pd.DataFrame): The raw sensor data.
groupby (str): The column to group by for model selection.
Returns:
pd.DataFrame: The original dataset with calibrated PM2.5 and PM10 values.
"""

data["timestamp"] = pd.to_datetime(data["timestamp"])

to_calibrate = data["network"] == DeviceNetwork.AIRQO.str

calibrated_data = AirQoDataUtils._airqo_calibrate(
data.loc[to_calibrate], groupby
)

data.loc[
to_calibrate, ["pm2_5_calibrated_value", "pm10_calibrated_value"]
] = calibrated_data

data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)

data = data.assign(
pm2_5_calibrated_value=data.get("pm2_5_calibrated_value", np.nan),
pm10_calibrated_value=data.get("pm10_calibrated_value", np.nan),
)

data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
].fillna(data.loc[to_calibrate, "pm2_5_raw_value"])
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
].fillna(data.loc[to_calibrate, "pm10_raw_value"])

return data.drop(
columns=[
"avg_pm2_5",
"avg_pm10",
"error_pm2_5",
"error_pm10",
"pm2_5_pm10",
"pm2_5_pm10_mod",
"hour",
],
errors="ignore",
)

def _airqo_calibrate(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Calibrates air quality sensor data by applying machine learning models to adjust sensor readings.
Expand All @@ -587,29 +639,26 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
data (pd.DataFrame): The raw air quality sensor data.
Returns:
pd.DataFrame: The calibrated dataset with additional processed fields.
pd.DataFrame: A DataFrame with calibrated PM2.5 and PM10 values.
"""
bucket = Config.FORECAST_MODELS_BUCKET
project_id = Config.GOOGLE_CLOUD_PROJECT_ID
calibrate_by: Dict[str, Union[CityModels, CountryModels]] = {
"city": CityModels,
"country": CountryModels,
}

models: Union[CityModels, CountryModels] = calibrate_by.get(
groupby, CountryModels
)

sites = DataUtils.get_sites()
if sites.empty:
raise RuntimeError("Failed to fetch sites data from the cache/API")

bucket = Config.FORECAST_MODELS_BUCKET
project_id = Config.GOOGLE_CLOUD_PROJECT_ID

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = sites[["site_id", groupby]]
data = pd.merge(data, sites, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
"s2_pm2_5",
"s2_pm10",
"temperature",
"humidity",
]
input_variables = [
"avg_pm2_5",
"avg_pm10",
Expand All @@ -622,78 +671,34 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"pm2_5_pm10_mod",
]

# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# Fill nas for the specified fields.
if "airqo" not in pd.unique(data.network):
data = data.reindex(
columns=data.columns.union(columns_to_fill, sort=False),
fill_value=np.nan,
)

to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate[columns_to_fill] = data_to_calibrate[columns_to_fill].fillna(
0
)

# additional input columns for calibration
data_to_calibrate["avg_pm2_5"] = (
data_to_calibrate[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
)
data_to_calibrate["avg_pm10"] = (
data_to_calibrate[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
)
data_to_calibrate["error_pm2_5"] = np.abs(
data_to_calibrate["s1_pm2_5"] - data_to_calibrate["s2_pm2_5"]
)
data_to_calibrate["error_pm10"] = np.abs(
data_to_calibrate["s1_pm10"] - data_to_calibrate["s2_pm10"]
)
data_to_calibrate["pm2_5_pm10"] = (
data_to_calibrate["avg_pm2_5"] - data_to_calibrate["avg_pm10"]
)
data_to_calibrate["pm2_5_pm10_mod"] = (
data_to_calibrate["avg_pm2_5"] / data_to_calibrate["avg_pm10"]
)
data_to_calibrate["hour"] = data_to_calibrate["timestamp"].dt.hour

data_to_calibrate[input_variables] = data_to_calibrate[input_variables].replace(
[np.inf, -np.inf], 0
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
data["avg_pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
data["error_pm2_5"] = np.abs(data["s1_pm2_5"] - data["s2_pm2_5"])
data["error_pm10"] = np.abs(data["s1_pm10"] - data["s2_pm10"])
data["pm2_5_pm10"] = data["avg_pm2_5"] - data["avg_pm10"]
data["pm2_5_pm10_mod"] = data["avg_pm2_5"] / data["avg_pm10"]
data["hour"] = data["timestamp"].dt.hour

data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)

default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm2_5"),
)

calibrate_by: Dict[str, Union[CityModels, CountryModels]] = {
"city": CityModels,
"country": CountryModels,
}

models: Union[CityModels, CountryModels] = calibrate_by.get(
groupby, CountryModels
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm10"),
)

# Explicitly filter data to calibrate. At the moment, only calibrating on AirQo data.

data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby(groupby, dropna=False)
if not data_to_calibrate.empty:
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm2_5"
),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm10"
),
)

available_models = [c.value for c in models]

for groupedby, group in grouped_df:
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
calibrated_data = pd.DataFrame(index=data.index)

for groupedby, group in data.groupby(groupby, dropna=False):
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
if (
groupedby
and not pd.isna(groupedby)
Expand All @@ -720,49 +725,15 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
)
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
else:
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model

group["pm2_5_calibrated_value"] = current_rf_model.predict(
group[input_variables]
)
group["pm10_calibrated_value"] = current_lasso_model.predict(
group[input_variables]
)
data.loc[
group.index, ["pm2_5_calibrated_value", "pm10_calibrated_value"]
] = group[["pm2_5_calibrated_value", "pm10_calibrated_value"]]

# Compute raw pm2_5 and pm10 values.
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)

# Create calibrated columns if they don't exist
data["pm2_5_calibrated_value"] = data.get("pm2_5_calibrated_value", np.nan)
data["pm10_calibrated_value"] = data.get("pm10_calibrated_value", np.nan)

# Assign calibrated values, falling back to raw values when missing
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
].fillna(data.loc[to_calibrate, "pm2_5_raw_value"])
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
].fillna(data.loc[to_calibrate, "pm10_raw_value"])
calibrated_data.loc[
group.index, "pm2_5_calibrated_value"
] = current_rf_model.predict(group[input_variables])
calibrated_data.loc[
group.index, "pm10_calibrated_value"
] = current_lasso_model.predict(group[input_variables])

return data.drop(
columns=[
"avg_pm2_5",
"avg_pm10",
"error_pm2_5",
"error_pm10",
"pm2_5_pm10",
"pm2_5_pm10_mod",
"hour",
"city",
],
errors="ignore",
)
return calibrated_data

@staticmethod
def extract_devices_with_uncalibrated_data(
Expand All @@ -774,7 +745,7 @@ def extract_devices_with_uncalibrated_data(
Args:
start_date (str or datetime): The date for which to check missing uncalibrated data.
table (str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically.
network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.xxxx.
network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO.
Returns:
pd.DataFrame: A DataFrame containing the devices with missing uncalibrated data.
Expand Down
Loading

0 comments on commit abb3251

Please sign in to comment.