Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #4063

Merged
merged 15 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion k8s/analytics/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ images:
celeryWorker: eu.gcr.io/airqo-250220/airqo-analytics-celery-worker
reportJob: eu.gcr.io/airqo-250220/airqo-analytics-report-job
devicesSummaryJob: eu.gcr.io/airqo-250220/airqo-analytics-devices-summary-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
api:
name: airqo-analytics-api
label: analytics-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-ee15b958-1733833086
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-airqo.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/airqo-exceedance-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/exceedance/values-prod-kcca.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ app:
configmap: env-exceedance-production
image:
repository: eu.gcr.io/airqo-250220/kcca-exceedance-job
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
2 changes: 1 addition & 1 deletion k8s/predict/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ images:
predictJob: eu.gcr.io/airqo-250220/airqo-predict-job
trainJob: eu.gcr.io/airqo-250220/airqo-train-job
predictPlaces: eu.gcr.io/airqo-250220/airqo-predict-places-air-quality
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
api:
name: airqo-prediction-api
label: prediction-api
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/website/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-website-api
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-75b96595-1734025664
tag: prod-85d64e95-1734081670
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:

data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if (data["pm2_5_raw_value"] == 0).all():
if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

zero_columns = data.loc[:, (data == 0).all()].columns
Expand Down
36 changes: 33 additions & 3 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,22 +604,52 @@ def reload_data(
self,
dataframe: pd.DataFrame,
table: str,
tenant: Tenant = Tenant.ALL,
network: str = "all",
start_date_time: str = None,
end_date_time: str = None,
where_fields: dict = None,
null_cols: list = None,
) -> None:
"""
Reloads data into a specified table in BigQuery by:
1. Deleting existing records in the table based on the provided date range,
network, and optional filtering criteria.
2. Inserting new records from the provided DataFrame.

Args:
dataframe (pd.DataFrame): The data to be reloaded into the table.
table (str): The target table in BigQuery.
network (str, optional): The network filter to be applied. Defaults to "all".
start_date_time (str, optional): The start of the date range for deletion.
If None, inferred from the DataFrame's earliest timestamp.
end_date_time (str, optional): The end of the date range for deletion.
If None, inferred from the DataFrame's latest timestamp.
where_fields (dict, optional): Additional fields and values for filtering rows to delete.
null_cols (list, optional): Columns to filter on `NULL` values during deletion.

Returns:
None: The function performs operations directly on the BigQuery table.

Raises:
ValueError: If `timestamp` column is missing in the DataFrame.
"""

if start_date_time is None or end_date_time is None:
data = dataframe.copy()
if "timestamp" not in dataframe.columns:
raise ValueError(
"The DataFrame must contain a 'timestamp' column to derive the date range."
)
data = (
dataframe.copy()
) # Not sure why this dataframe is being copied. # Memory wastage?
data["timestamp"] = pd.to_datetime(data["timestamp"])
start_date_time = date_to_str(data["timestamp"].min())
end_date_time = date_to_str(data["timestamp"].max())

query = self.compose_query(
QueryType.DELETE,
table=table,
tenant=tenant,
network=network,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
Expand Down
68 changes: 42 additions & 26 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,47 @@
class DailyDataUtils:
@staticmethod
def average_data(data: pd.DataFrame) -> pd.DataFrame:
averaged_data = pd.DataFrame()
data["timestamp"] = data["timestamp"].apply(pd.to_datetime)

for _, by_tenant in data.groupby("tenant"):
tenant = by_tenant.iloc[0]["tenant"]
del by_tenant["tenant"]
for _, by_device in by_tenant.groupby("device_id"):
site_id = by_device.iloc[0]["site_id"]
device_id = by_device.iloc[0]["device_id"]
device_number = by_device.iloc[0]["device_number"]

del by_device["site_id"]
del by_device["device_id"]
del by_device["device_number"]

device_averages = by_device.resample("1D", on="timestamp").mean()
device_averages["timestamp"] = device_averages.index
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number
device_averages["tenant"] = tenant

averaged_data = pd.concat(
[averaged_data, device_averages], ignore_index=True
)
"""
Averages data in a pandas DataFrame on a daily basis for each device,
grouped by network and device ID. The function resamples data
to compute daily averages for numerical columns.

Args:
data (pd.DataFrame): A pandas DataFrame containing the following columns:
- "timestamp": Timestamps of the data.
- "network": The network the data belongs to.
- "device_id": Unique identifier for the device.
- "site_id": Unique identifier for the site associated with the device.
- "device_number": Device number.

Returns:
pd.DataFrame: A DataFrame containing daily averages for each device,
including metadata columns such as "tenant", "device_id", "site_id",
and "device_number".
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])

averaged_data_list = []

for (network, device_id), group in data.groupby(["network", "device_id"]):
network = group["network"].iloc[0]
site_id = group["site_id"].iloc[0]
device_number = group["device_number"].iloc[0]

device_averages = (
group.resample("1D", on="timestamp")
.mean(numeric_only=True)
.reset_index()
)

device_averages["network"] = network
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number

averaged_data_list.append(device_averages)

averaged_data = pd.concat(averaged_data_list, ignore_index=True)

return averaged_data

Expand Down Expand Up @@ -77,7 +93,7 @@ def cleanup_and_reload(
)

bigquery_api.reload_data(
tenant=Tenant.ALL,
network="all",
table=table,
dataframe=data,
start_date_time=start_date_time,
Expand Down
Loading