Skip to content

Commit

Permalink
Merge pull request #4059 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 12, 2024
2 parents 48e4e72 + 1396ff4 commit 75b9659
Show file tree
Hide file tree
Showing 17 changed files with 79 additions and 72 deletions.
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 2
image:
repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api
tag: stage-a67d93c7-1733998369
tag: stage-04fa3ebc-1734023297
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-3cb7d03d-1734014240
tag: prod-48e4e72c-1734023346
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
102 changes: 56 additions & 46 deletions src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ class EventsModel(BasePyMongoModel):
BIGQUERY_COHORTS = f"`{CONFIGURATIONS.BIGQUERY_COHORTS}`"
BIGQUERY_COHORTS_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_COHORTS_DEVICES}`"
BIGQUERY_SITES = f"`{CONFIGURATIONS.BIGQUERY_SITES}`"
BIGQUERY_SITES_SITES = f"`{CONFIGURATIONS.BIGQUERY_SITES_SITES}`"
BIGQUERY_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_DEVICES}`"
BIGQUERY_DEVICES_DEVICES = f"`{CONFIGURATIONS.BIGQUERY_DEVICES_DEVICES}`"
DATA_EXPORT_DECIMAL_PLACES = CONFIGURATIONS.DATA_EXPORT_DECIMAL_PLACES

BIGQUERY_EVENTS = CONFIGURATIONS.BIGQUERY_EVENTS
Expand All @@ -51,28 +53,30 @@ def __init__(self, tenant):
"""
self.limit_mapper = {"pm2_5": 500.5, "pm10": 604.5, "no2": 2049}
self.sites_table = self.BIGQUERY_SITES
self.sites_sites_table = self.BIGQUERY_SITES_SITES
self.airqlouds_sites_table = self.BIGQUERY_AIRQLOUDS_SITES
self.devices_table = self.BIGQUERY_DEVICES
self.devices_devices_table = self.BIGQUERY_DEVICES_DEVICES
self.airqlouds_table = self.BIGQUERY_AIRQLOUDS
super().__init__(tenant, collection_name="events")

@property
def device_info_query(self):
"""Generates a device information query including site_id, tenant, and approximate location details."""
"""Generates a device information query including site_id, network, and approximate location details."""
return (
f"{self.devices_table}.site_id AS site_id, "
f"{self.devices_table}.tenant AS tenant "
f"{self.devices_devices_table}.site_id AS site_id, "
f"{self.devices_devices_table}.network AS network "
)

@property
def device_info_query_airqloud(self):
"""Generates a device information query specifically for airqlouds, excluding the site_id."""
return f"{self.devices_table}.tenant AS tenant "
return f"{self.devices_devices_table}.network AS network "

@property
def site_info_query(self):
"""Generates a site information query to retrieve site name and approximate location details."""
return f"{self.sites_table}.name AS site_name "
return f"{self.sites_sites_table}.name AS site_name "

@property
def airqloud_info_query(self):
Expand All @@ -92,8 +96,8 @@ def add_device_join(self, data_query, filter_clause=""):
"""
return (
f"SELECT {self.device_info_query}, data.* "
f"FROM {self.devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_table}.device_id "
f"FROM {self.devices_devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.device_name = {self.devices_devices_table}.device_id "
f"{filter_clause}"
)

Expand All @@ -110,8 +114,8 @@ def add_device_join_to_airqlouds(self, data_query, filter_clause=""):
"""
return (
f"SELECT {self.device_info_query_airqloud}, data.* "
f"FROM {self.devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_table}.site_id "
f"FROM {self.devices_devices_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.devices_devices_table}.site_id "
f"{filter_clause}"
)

Expand All @@ -127,8 +131,8 @@ def add_site_join(self, data_query):
"""
return (
f"SELECT {self.site_info_query}, data.* "
f"FROM {self.sites_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_table}.id "
f"FROM {self.sites_sites_table} "
f"RIGHT JOIN ({data_query}) data ON data.site_id = {self.sites_sites_table}.id "
)

def add_airqloud_join(self, data_query):
Expand Down Expand Up @@ -194,23 +198,23 @@ def get_device_query(
including BAM data if applicable.
"""
query = (
f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"{pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_devices_table}.name AS device_name "
f"FROM {data_table} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {data_table}.device_id "
f"JOIN {self.devices_devices_table} ON {self.devices_devices_table}.device_id = {data_table}.device_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
f"AND {self.devices_devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"

query = self.add_site_join(query)
if frequency in ["hourly", "weekly", "monthly", "yearly"]:
bam_query = (
f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_table}.name AS device_name "
f"{bam_pollutants_query}, {time_grouping}, {self.device_info_query}, {self.devices_devices_table}.name AS device_name "
f"FROM {self.BIGQUERY_BAM_DATA} "
f"JOIN {self.devices_table} ON {self.devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"JOIN {self.devices_devices_table} ON {self.devices_devices_table}.device_id = {self.BIGQUERY_BAM_DATA}.device_id "
f"WHERE {self.BIGQUERY_BAM_DATA}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.devices_table}.device_id IN UNNEST(@filter_value) "
f"AND {self.devices_devices_table}.device_id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
bam_query += " GROUP BY ALL"
Expand Down Expand Up @@ -247,9 +251,9 @@ def get_site_query(
query = (
f"{pollutants_query}, {time_grouping}, {self.site_info_query}, {data_table}.device_id AS device_name "
f"FROM {data_table} "
f"JOIN {self.sites_table} ON {self.sites_table}.id = {data_table}.site_id "
f"JOIN {self.sites_sites_table} ON {self.sites_sites_table}.id = {data_table}.site_id "
f"WHERE {data_table}.timestamp BETWEEN '{start_date}' AND '{end_date}' "
f"AND {self.sites_table}.id IN UNNEST(@filter_value) "
f"AND {self.sites_sites_table}.id IN UNNEST(@filter_value) "
)
if frequency in ["weekly", "monthly", "yearly"]:
query += " GROUP BY ALL"
Expand Down Expand Up @@ -538,7 +542,7 @@ def download_from_bigquery(
else:
drop_columns.append("datetime")
sorting_cols.append("datetime")

dataframe.to_csv("raw_data50.csv")
if data_type == "raw":
cls.simple_data_cleaning(dataframe)

Expand All @@ -551,39 +555,45 @@ def download_from_bigquery(
@classmethod
def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:
"""
Perform data cleaning on a pandas DataFrame to handle specific conditions
related to "pm2_5" and "pm2_5_raw_value" columns.
The cleaning process includes:
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
2. Removing "pm2_5" values where "pm2_5_raw_value" has data.
3. Dropping the "pm2_5_raw_value" column if it has no data at all.
4. Retaining "pm2_5" values where "pm2_5_raw_value" has no data, and removing
"pm2_5" values where "pm2_5_raw_value" has data.
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if it is
entirely empty.
Args:
cls: Class reference (used in classmethods).
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
"pm2_5_raw_value" columns.
Returns:
pd.DataFrame: Cleaned DataFrame with updates applied in place.
Perform data cleaning on a pandas DataFrame to handle specific conditions
related to "pm2_5" and "pm2_5_raw_value" columns.
The cleaning process includes:
1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
2. Removing "pm2_5" values where "pm2_5_raw_value" values are not 0s.
3. Dropping the "pm2_5_raw_value" column if all its values are 0s.
4. Retaining "pm2_5" values where "pm2_5_raw_value" values are all 0s.
5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if all values are 0s.
Args:
cls: Class reference (used in classmethods).
data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
"pm2_5_raw_value" columns.
Returns:
pd.DataFrame: Cleaned DataFrame with updates applied in place.
Raises:
ValueError: If "pm2_5" or "pm2_5_raw_value" columns are missing.
"""
data["pm2_5_raw_value"] = pd.to_numeric(
data["pm2_5_raw_value"], errors="coerce"
required_columns = ["pm2_5", "pm2_5_raw_value"]

missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"Missing required columns: {missing_columns}")

numeric_columns = ["pm2_5", "pm2_5_raw_value"]
data[numeric_columns] = data[numeric_columns].apply(
pd.to_numeric, errors="coerce"
)
data["pm2_5"] = pd.to_numeric(data["pm2_5"], errors="coerce")

data.loc[~data["pm2_5_raw_value"].isna(), "pm2_5"] = np.nan
data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if data["pm2_5_raw_value"].isna().all():
if (data["pm2_5_raw_value"] == 0).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

data["pm2_5"] = data["pm2_5"].where(data["pm2_5_raw_value"].isna(), np.nan)

zero_columns = data.loc[:, (data == 0).all()].columns
data.drop(columns=zero_columns, inplace=True)
data.dropna(how="all", axis=1, inplace=True)

return data
Expand Down
2 changes: 2 additions & 0 deletions src/analytics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class Config:
CACHE_REDIS_URL = f"redis://{env_var('REDIS_SERVER')}:{env_var('REDIS_PORT')}"

BIGQUERY_DEVICES = env_var("BIGQUERY_DEVICES")
BIGQUERY_DEVICES_DEVICES = env_var("BIGQUERY_DEVICES_DEVICES")
BIGQUERY_SITES = env_var("BIGQUERY_SITES")
BIGQUERY_SITES_SITES = env_var("BIGQUERY_SITES_SITES")
BIGQUERY_AIRQLOUDS_SITES = env_var("BIGQUERY_AIRQLOUDS_SITES")
BIGQUERY_AIRQLOUDS = env_var("BIGQUERY_AIRQLOUDS")
DATA_EXPORT_DECIMAL_PLACES = os.getenv("DATA_EXPORT_DECIMAL_PLACES", 2)
Expand Down
2 changes: 2 additions & 0 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self):
self.raw_weather_table = configuration.BIGQUERY_RAW_WEATHER_TABLE
self.consolidated_data_table = configuration.BIGQUERY_ANALYTICS_TABLE
self.sites_table = configuration.BIGQUERY_SITES_TABLE
self.sites_sites_table = configuration.BIGQUERY_SITES_SITES_TABLE
self.airqlouds_table = configuration.BIGQUERY_AIRQLOUDS_TABLE
self.airqlouds_sites_table = configuration.BIGQUERY_AIRQLOUDS_SITES_TABLE
self.grids_table = configuration.BIGQUERY_GRIDS_TABLE
Expand All @@ -54,6 +55,7 @@ def __init__(self):
self.cohorts_devices_table = configuration.BIGQUERY_COHORTS_DEVICES_TABLE
self.sites_meta_data_table = configuration.BIGQUERY_SITES_META_DATA_TABLE
self.devices_table = configuration.BIGQUERY_DEVICES_TABLE
self.devices_devices_table = configuration.BIGQUERY_DEVICES_DEVICES_TABLE
self.devices_summary_table = configuration.BIGQUERY_DEVICES_SUMMARY_TABLE
self.openweathermap_table = configuration.BIGQUERY_OPENWEATHERMAP_TABLE
self.satellite_data_table = configuration.BIGQUERY_SATELLITE_DATA_TABLE
Expand Down
4 changes: 4 additions & 0 deletions src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ class Config:

# Meta data
BIGQUERY_DEVICES_TABLE = os.getenv("BIGQUERY_DEVICES_TABLE")
BIGQUERY_DEVICES_DEVICES_TABLE = os.getenv("BIGQUERY_DEVICES_DEVICES_TABLE")
BIGQUERY_DEVICES_DATA_TABLE = os.getenv("BIGQUERY_DEVICES_DATA_TABLE")
BIGQUERY_SITES_TABLE = os.getenv("BIGQUERY_SITES_TABLE")
BIGQUERY_SITES_SITES_TABLE = os.getenv("BIGQUERY_SITES_SITES_TABLE")
BIGQUERY_SITES_META_DATA_TABLE = os.getenv("BIGQUERY_SITES_META_DATA_TABLE")
BIGQUERY_AIRQLOUDS_TABLE = os.getenv("BIGQUERY_AIRQLOUDS_TABLE")
BIGQUERY_AIRQLOUDS_SITES_TABLE = os.getenv("BIGQUERY_AIRQLOUDS_SITES_TABLE")
Expand Down Expand Up @@ -371,9 +373,11 @@ class Config:
BIGQUERY_GRIDS_SITES_TABLE: "grids_sites.json",
BIGQUERY_COHORTS_DEVICES_TABLE: "cohorts_devices.json",
BIGQUERY_SITES_TABLE: "sites.json",
BIGQUERY_SITES_SITES_TABLE: "sites.json",
BIGQUERY_SITES_META_DATA_TABLE: "sites_meta_data.json",
SENSOR_POSITIONS_TABLE: "sensor_positions.json",
BIGQUERY_DEVICES_TABLE: "devices.json",
BIGQUERY_DEVICES_DEVICES_TABLE: "devices.json",
BIGQUERY_CLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_UNCLEAN_RAW_MOBILE_EVENTS_TABLE: "mobile_measurements.json",
BIGQUERY_AIRQO_MOBILE_EVENTS_TABLE: "airqo_mobile_measurements.json",
Expand Down
3 changes: 1 addition & 2 deletions src/workflows/airqo_etl_utils/meta_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
"approximate_latitude",
"approximate_longitude",
"name",
"location",
"search_name",
"location_name",
"description",
Expand All @@ -147,7 +146,7 @@ def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
},
inplace=True,
)

dataframe["last_updated"] = datetime.now(timezone.utc)
dataframe = DataValidationUtils.remove_outliers(dataframe)

return dataframe
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/airqo_etl_utils/schema/devices.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
{
"name": "device_id",
"type": "STRING",
"mode": "NULLABLE"
"mode": "REQUIRED"
},
{
"name": "device_number",
Expand Down Expand Up @@ -52,6 +52,6 @@
{
"name": "last_updated",
"type": "TIMESTAMP",
"mode": "NULLABLE"
"mode": "REQUIRED"
}
]
12 changes: 1 addition & 11 deletions src/workflows/airqo_etl_utils/schema/sites.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
[
{
"name": "tenant",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "network",
"type": "STRING",
Expand Down Expand Up @@ -39,11 +34,6 @@
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "location",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "display_name",
"type": "STRING",
Expand Down Expand Up @@ -76,7 +66,7 @@
},
{
"name": "last_updated",
"type": "DATE",
"type": "TIMESTAMP",
"mode": "NULLABLE"
}
]
Loading

0 comments on commit 75b9659

Please sign in to comment.