Skip to content

Commit

Permalink
Merge pull request #4063 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Dec 13, 2024
2 parents 85d64e9 + 40bbbd5 commit 875fce3
Show file tree
Hide file tree
Showing 12 changed files with 85 additions and 39 deletions.
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-ee15b958-1733833086
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:

data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if (data["pm2_5_raw_value"] == 0).all():
if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

zero_columns = data.loc[:, (data == 0).all()].columns
Expand Down
36 changes: 33 additions & 3 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,22 +604,52 @@ def reload_data(
self,
dataframe: pd.DataFrame,
table: str,
tenant: Tenant = Tenant.ALL,
network: str = "all",
start_date_time: str = None,
end_date_time: str = None,
where_fields: dict = None,
null_cols: list = None,
) -> None:
"""
Reloads data into a specified table in BigQuery by:
1. Deleting existing records in the table based on the provided date range,
network, and optional filtering criteria.
2. Inserting new records from the provided DataFrame.
Args:
dataframe (pd.DataFrame): The data to be reloaded into the table.
table (str): The target table in BigQuery.
network (str, optional): The network filter to be applied. Defaults to "all".
start_date_time (str, optional): The start of the date range for deletion.
If None, inferred from the DataFrame's earliest timestamp.
end_date_time (str, optional): The end of the date range for deletion.
If None, inferred from the DataFrame's latest timestamp.
where_fields (dict, optional): Additional fields and values for filtering rows to delete.
null_cols (list, optional): Columns to filter on `NULL` values during deletion.
Returns:
None: The function performs operations directly on the BigQuery table.
Raises:
ValueError: If `timestamp` column is missing in the DataFrame.
"""

if start_date_time is None or end_date_time is None:
data = dataframe.copy()
if "timestamp" not in dataframe.columns:
raise ValueError(
"The DataFrame must contain a 'timestamp' column to derive the date range."
)
data = (
dataframe.copy()
) # Not sure why this dataframe is being copied. # Memory wastage?
data["timestamp"] = pd.to_datetime(data["timestamp"])
start_date_time = date_to_str(data["timestamp"].min())
end_date_time = date_to_str(data["timestamp"].max())

query = self.compose_query(
QueryType.DELETE,
table=table,
tenant=tenant,
network=network,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
Expand Down
68 changes: 42 additions & 26 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,47 @@
class DailyDataUtils:
@staticmethod
def average_data(data: pd.DataFrame) -> pd.DataFrame:
averaged_data = pd.DataFrame()
data["timestamp"] = data["timestamp"].apply(pd.to_datetime)

for _, by_tenant in data.groupby("tenant"):
tenant = by_tenant.iloc[0]["tenant"]
del by_tenant["tenant"]
for _, by_device in by_tenant.groupby("device_id"):
site_id = by_device.iloc[0]["site_id"]
device_id = by_device.iloc[0]["device_id"]
device_number = by_device.iloc[0]["device_number"]

del by_device["site_id"]
del by_device["device_id"]
del by_device["device_number"]

device_averages = by_device.resample("1D", on="timestamp").mean()
device_averages["timestamp"] = device_averages.index
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number
device_averages["tenant"] = tenant

averaged_data = pd.concat(
[averaged_data, device_averages], ignore_index=True
)
"""
Averages data in a pandas DataFrame on a daily basis for each device,
grouped by network and device ID. The function resamples data
to compute daily averages for numerical columns.
Args:
data (pd.DataFrame): A pandas DataFrame containing the following columns:
- "timestamp": Timestamps of the data.
- "network": The network the data belongs to.
- "device_id": Unique identifier for the device.
- "site_id": Unique identifier for the site associated with the device.
- "device_number": Device number.
Returns:
pd.DataFrame: A DataFrame containing daily averages for each device,
including metadata columns such as "tenant", "device_id", "site_id",
and "device_number".
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])

averaged_data_list = []

for (network, device_id), group in data.groupby(["network", "device_id"]):
network = group["network"].iloc[0]
site_id = group["site_id"].iloc[0]
device_number = group["device_number"].iloc[0]

device_averages = (
group.resample("1D", on="timestamp")
.mean(numeric_only=True)
.reset_index()
)

device_averages["network"] = network
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number

averaged_data_list.append(device_averages)

averaged_data = pd.concat(averaged_data_list, ignore_index=True)

return averaged_data

Expand Down Expand Up @@ -77,7 +93,7 @@ def cleanup_and_reload(
)

bigquery_api.reload_data(
tenant=Tenant.ALL,
network="all",
table=table,
dataframe=data,
start_date_time=start_date_time,
Expand Down

0 comments on commit 875fce3

Please sign in to comment.