Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #4525

Merged
merged 17 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-auth-api
tag: stage-6410a447-1741021914
tag: stage-ef828b9a-1741087033
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/data-mgt/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-data-mgt-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-a8923ac0-1741034129
tag: prod-cc4c9655-1741087107
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
6 changes: 3 additions & 3 deletions src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from .data_validator import DataValidationUtils
from .date import str_to_date, date_to_str
from .utils import Utils

from .datautils import DataUtils
from .config import configuration
import logging

Expand Down Expand Up @@ -68,8 +68,8 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
Raises:
ValueError: If no devices are found for the BAM network or if no data is returned for the specified date range.
"""
devices = AirQoApi().get_devices_by_network(
device_network=DeviceNetwork.METONE, device_category=DeviceCategory.BAM
devices, _ = DataUtils.get_devices(
device_category=DeviceCategory.BAM, device_network=DeviceNetwork.METONE
)
bam_data = pd.DataFrame()

Expand Down
221 changes: 96 additions & 125 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ def merge_aggregated_mobile_devices_data_and_weather_data(
@staticmethod
def restructure_airqo_mobile_data_for_bigquery(data: pd.DataFrame) -> pd.DataFrame:
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["network"] = "airqo"
data["network"] = DeviceNetwork.AIRQO.str
big_query_api = BigQueryApi()
cols = big_query_api.get_columns(
table=big_query_api.airqo_mobile_measurements_table
Expand Down Expand Up @@ -468,7 +468,7 @@ def extract_devices_deployment_logs() -> pd.DataFrame:
for _, device in devices.iterrows():
try:
maintenance_logs = airqo_api.get_maintenance_logs(
network=device.get("network", "airqo"),
network=device.get("network", DeviceNetwork.AIRQO.str),
device=device.get("name", None),
activity_type="deployment",
)
Expand Down Expand Up @@ -570,6 +570,58 @@ def map_site_ids_to_historical_data(

@staticmethod
def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Merges calibrated data back into the original dataset and computes raw PM values after calibration.

Args:
data (pd.DataFrame): The raw sensor data.
groupby (str): The column to group by for model selection.

Returns:
pd.DataFrame: The original dataset with calibrated PM2.5 and PM10 values.
"""

data["timestamp"] = pd.to_datetime(data["timestamp"])

to_calibrate = data["network"] == DeviceNetwork.AIRQO.str

calibrated_data = AirQoDataUtils._airqo_calibrate(
data.loc[to_calibrate], groupby
)

data.loc[
to_calibrate, ["pm2_5_calibrated_value", "pm10_calibrated_value"]
] = calibrated_data

data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)

data = data.assign(
pm2_5_calibrated_value=data.get("pm2_5_calibrated_value", np.nan),
pm10_calibrated_value=data.get("pm10_calibrated_value", np.nan),
)

data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
].fillna(data.loc[to_calibrate, "pm2_5_raw_value"])
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
].fillna(data.loc[to_calibrate, "pm10_raw_value"])

return data.drop(
columns=[
"avg_pm2_5",
"avg_pm10",
"error_pm2_5",
"error_pm10",
"pm2_5_pm10",
"pm2_5_pm10_mod",
"hour",
],
errors="ignore",
)

def _airqo_calibrate(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"""
Calibrates air quality sensor data by applying machine learning models to adjust sensor readings.

Expand All @@ -587,29 +639,26 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
data (pd.DataFrame): The raw air quality sensor data.

Returns:
pd.DataFrame: The calibrated dataset with additional processed fields.
pd.DataFrame: A DataFrame with calibrated PM2.5 and PM10 values.
"""
bucket = Config.FORECAST_MODELS_BUCKET
project_id = Config.GOOGLE_CLOUD_PROJECT_ID
calibrate_by: Dict[str, Union[CityModels, CountryModels]] = {
"city": CityModels,
"country": CountryModels,
}

models: Union[CityModels, CountryModels] = calibrate_by.get(
groupby, CountryModels
)

sites = DataUtils.get_sites()
if sites.empty:
raise RuntimeError("Failed to fetch sites data from the cache/API")

bucket = Config.FORECAST_MODELS_BUCKET
project_id = Config.GOOGLE_CLOUD_PROJECT_ID

data["timestamp"] = pd.to_datetime(data["timestamp"])
sites = sites[["site_id", groupby]]
data = pd.merge(data, sites, on="site_id", how="left")
data.dropna(subset=["device_id", "timestamp"], inplace=True)

columns_to_fill = [
"s1_pm2_5",
"s1_pm10",
"s2_pm2_5",
"s2_pm10",
"temperature",
"humidity",
]
input_variables = [
"avg_pm2_5",
"avg_pm10",
Expand All @@ -622,78 +671,34 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
"pm2_5_pm10_mod",
]

# TODO: Need to opt for a different approach eg forward fill, can't do here as df only has data of last 1 hour. Perhaps use raw data only?
# Fill nas for the specified fields.
if "airqo" not in pd.unique(data.network):
data = data.reindex(
columns=data.columns.union(columns_to_fill, sort=False),
fill_value=np.nan,
)

to_calibrate = data["network"] == "airqo"
data_to_calibrate = data.loc[to_calibrate]
data_to_calibrate[columns_to_fill] = data_to_calibrate[columns_to_fill].fillna(
0
)

# additional input columns for calibration
data_to_calibrate["avg_pm2_5"] = (
data_to_calibrate[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
)
data_to_calibrate["avg_pm10"] = (
data_to_calibrate[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
)
data_to_calibrate["error_pm2_5"] = np.abs(
data_to_calibrate["s1_pm2_5"] - data_to_calibrate["s2_pm2_5"]
)
data_to_calibrate["error_pm10"] = np.abs(
data_to_calibrate["s1_pm10"] - data_to_calibrate["s2_pm10"]
)
data_to_calibrate["pm2_5_pm10"] = (
data_to_calibrate["avg_pm2_5"] - data_to_calibrate["avg_pm10"]
)
data_to_calibrate["pm2_5_pm10_mod"] = (
data_to_calibrate["avg_pm2_5"] / data_to_calibrate["avg_pm10"]
)
data_to_calibrate["hour"] = data_to_calibrate["timestamp"].dt.hour

data_to_calibrate[input_variables] = data_to_calibrate[input_variables].replace(
[np.inf, -np.inf], 0
data["avg_pm2_5"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1).round(2)
data["avg_pm10"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1).round(2)
data["error_pm2_5"] = np.abs(data["s1_pm2_5"] - data["s2_pm2_5"])
data["error_pm10"] = np.abs(data["s1_pm10"] - data["s2_pm10"])
data["pm2_5_pm10"] = data["avg_pm2_5"] - data["avg_pm10"]
data["pm2_5_pm10_mod"] = data["avg_pm2_5"] / data["avg_pm10"]
data["hour"] = data["timestamp"].dt.hour

data[input_variables] = data[input_variables].replace([np.inf, -np.inf], 0)

default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm2_5"),
)

calibrate_by: Dict[str, Union[CityModels, CountryModels]] = {
"city": CityModels,
"country": CountryModels,
}

models: Union[CityModels, CountryModels] = calibrate_by.get(
groupby, CountryModels
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(models.DEFAULT, "pm10"),
)

# Explicitly filter data to calibrate. At the moment, only calibrating on AirQo data.

data_to_calibrate.dropna(subset=input_variables, inplace=True)
grouped_df = data_to_calibrate.groupby(groupby, dropna=False)
if not data_to_calibrate.empty:
default_rf_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm2_5"
),
)
default_lasso_model = GCSUtils.get_trained_model_from_gcs(
project_name=project_id,
bucket_name=bucket,
source_blob_name=Utils.get_calibration_model_path(
models.DEFAULT, "pm10"
),
)

available_models = [c.value for c in models]

for groupedby, group in grouped_df:
# If the below condition fails, the rf_model and lasso_model default to the previously ones used and the ones set as "default" outside the forloop.
calibrated_data = pd.DataFrame(index=data.index)

for groupedby, group in data.groupby(groupby, dropna=False):
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
if (
groupedby
and not pd.isna(groupedby)
Expand All @@ -720,49 +725,15 @@ def calibrate_data(data: pd.DataFrame, groupby: str) -> pd.DataFrame:
)
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model
else:
current_rf_model = default_rf_model
current_lasso_model = default_lasso_model

group["pm2_5_calibrated_value"] = current_rf_model.predict(
group[input_variables]
)
group["pm10_calibrated_value"] = current_lasso_model.predict(
group[input_variables]
)
data.loc[
group.index, ["pm2_5_calibrated_value", "pm10_calibrated_value"]
] = group[["pm2_5_calibrated_value", "pm10_calibrated_value"]]

# Compute raw pm2_5 and pm10 values.
data["pm2_5_raw_value"] = data[["s1_pm2_5", "s2_pm2_5"]].mean(axis=1)
data["pm10_raw_value"] = data[["s1_pm10", "s2_pm10"]].mean(axis=1)

# Create calibrated columns if they don't exist
data["pm2_5_calibrated_value"] = data.get("pm2_5_calibrated_value", np.nan)
data["pm10_calibrated_value"] = data.get("pm10_calibrated_value", np.nan)

# Assign calibrated values, falling back to raw values when missing
data.loc[to_calibrate, "pm2_5"] = data.loc[
to_calibrate, "pm2_5_calibrated_value"
].fillna(data.loc[to_calibrate, "pm2_5_raw_value"])
data.loc[to_calibrate, "pm10"] = data.loc[
to_calibrate, "pm10_calibrated_value"
].fillna(data.loc[to_calibrate, "pm10_raw_value"])
calibrated_data.loc[
group.index, "pm2_5_calibrated_value"
] = current_rf_model.predict(group[input_variables])
calibrated_data.loc[
group.index, "pm10_calibrated_value"
] = current_lasso_model.predict(group[input_variables])

return data.drop(
columns=[
"avg_pm2_5",
"avg_pm10",
"error_pm2_5",
"error_pm10",
"pm2_5_pm10",
"pm2_5_pm10_mod",
"hour",
"city",
],
errors="ignore",
)
return calibrated_data

@staticmethod
def extract_devices_with_uncalibrated_data(
Expand All @@ -774,7 +745,7 @@ def extract_devices_with_uncalibrated_data(
Args:
start_date (str or datetime): The date for which to check missing uncalibrated data.
table (str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically.
network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.xxxx.
network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO.

Returns:
pd.DataFrame: A DataFrame containing the devices with missing uncalibrated data.
Expand Down
Loading
Loading