From c8def2396362d2c0312f19d351f03cb15addbedc Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 13:03:23 +0300 Subject: [PATCH 1/4] Update daily measurements to use networks --- .../airqo_etl_utils/daily_data_utils.py | 66 ++++++++++++------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/src/workflows/airqo_etl_utils/daily_data_utils.py b/src/workflows/airqo_etl_utils/daily_data_utils.py index 804382c8e2..f12ff79390 100644 --- a/src/workflows/airqo_etl_utils/daily_data_utils.py +++ b/src/workflows/airqo_etl_utils/daily_data_utils.py @@ -8,31 +8,47 @@ class DailyDataUtils: @staticmethod def average_data(data: pd.DataFrame) -> pd.DataFrame: - averaged_data = pd.DataFrame() - data["timestamp"] = data["timestamp"].apply(pd.to_datetime) - - for _, by_tenant in data.groupby("tenant"): - tenant = by_tenant.iloc[0]["tenant"] - del by_tenant["tenant"] - for _, by_device in by_tenant.groupby("device_id"): - site_id = by_device.iloc[0]["site_id"] - device_id = by_device.iloc[0]["device_id"] - device_number = by_device.iloc[0]["device_number"] - - del by_device["site_id"] - del by_device["device_id"] - del by_device["device_number"] - - device_averages = by_device.resample("1D", on="timestamp").mean() - device_averages["timestamp"] = device_averages.index - device_averages["device_id"] = device_id - device_averages["site_id"] = site_id - device_averages["device_number"] = device_number - device_averages["tenant"] = tenant - - averaged_data = pd.concat( - [averaged_data, device_averages], ignore_index=True - ) + """ + Averages data in a pandas DataFrame on a daily basis for each device, + grouped by network and device ID. The function resamples data + to compute daily averages for numerical columns. + + Args: + data (pd.DataFrame): A pandas DataFrame containing the following columns: + - "timestamp": Timestamps of the data. + - "network": The network the data belongs to. + - "device_id": Unique identifier for the device. + - "site_id": Unique identifier for the site associated with the device. + - "device_number": Device number. + + Returns: + pd.DataFrame: A DataFrame containing daily averages for each device, + including metadata columns such as "tenant", "device_id", "site_id", + and "device_number". + """ + data["timestamp"] = pd.to_datetime(data["timestamp"]) + + averaged_data_list = [] + + for (network, device_id), group in data.groupby(["network", "device_id"]): + network = group["network"].iloc[0] + site_id = group["site_id"].iloc[0] + device_number = group["device_number"].iloc[0] + + device_averages = ( + group.resample("1D", on="timestamp") + .mean(numeric_only=True) + .reset_index() + ) + + device_averages["network"] = network + device_averages["device_id"] = device_id + device_averages["site_id"] = site_id + device_averages["device_number"] = device_number + + averaged_data_list.append(device_averages) + + averaged_data = pd.concat(averaged_data_list, ignore_index=True) return averaged_data From a1766850d9060386cb9b686059f9f3bd38b30c80 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 13:25:58 +0300 Subject: [PATCH 2/4] Remove empty values --- src/analytics/api/models/events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/analytics/api/models/events.py b/src/analytics/api/models/events.py index f4884aa24a..34dd31dc06 100644 --- a/src/analytics/api/models/events.py +++ b/src/analytics/api/models/events.py @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame: data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan - if (data["pm2_5_raw_value"] == 0).all(): + if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all(): data.drop(columns=["pm2_5_raw_value"], inplace=True) zero_columns = data.loc[:, (data == 0).all()].columns From c15a9a1b59315dac10bae7b8d41b4bbdec14fd77 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 13:52:43 +0300 Subject: [PATCH 3/4] Cleanup --- src/workflows/airqo_etl_utils/bigquery_api.py | 36 +++++++++++++++++-- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/src/workflows/airqo_etl_utils/bigquery_api.py b/src/workflows/airqo_etl_utils/bigquery_api.py index 761113a622..581982a13e 100644 --- a/src/workflows/airqo_etl_utils/bigquery_api.py +++ b/src/workflows/airqo_etl_utils/bigquery_api.py @@ -604,14 +604,44 @@ def reload_data( self, dataframe: pd.DataFrame, table: str, - tenant: Tenant = Tenant.ALL, + network: str = "all", start_date_time: str = None, end_date_time: str = None, where_fields: dict = None, null_cols: list = None, ) -> None: + """ + Reloads data into a specified table in BigQuery by: + 1. Deleting existing records in the table based on the provided date range, + network, and optional filtering criteria. + 2. Inserting new records from the provided DataFrame. + + Args: + dataframe (pd.DataFrame): The data to be reloaded into the table. + table (str): The target table in BigQuery. + network (str, optional): The network filter to be applied. Defaults to "all". + start_date_time (str, optional): The start of the date range for deletion. + If None, inferred from the DataFrame's earliest timestamp. + end_date_time (str, optional): The end of the date range for deletion. + If None, inferred from the DataFrame's latest timestamp. + where_fields (dict, optional): Additional fields and values for filtering rows to delete. + null_cols (list, optional): Columns to filter on `NULL` values during deletion. + + Returns: + None: The function performs operations directly on the BigQuery table. + + Raises: + ValueError: If `timestamp` column is missing in the DataFrame. + """ + if start_date_time is None or end_date_time is None: - data = dataframe.copy() + if "timestamp" not in dataframe.columns: + raise ValueError( + "The DataFrame must contain a 'timestamp' column to derive the date range." + ) + data = ( + dataframe.copy() + ) # Not sure why this dataframe is being copied. # Memory wastage? data["timestamp"] = pd.to_datetime(data["timestamp"]) start_date_time = date_to_str(data["timestamp"].min()) end_date_time = date_to_str(data["timestamp"].max()) @@ -619,7 +649,7 @@ def reload_data( query = self.compose_query( QueryType.DELETE, table=table, - tenant=tenant, + network=network, start_date_time=start_date_time, end_date_time=end_date_time, where_fields=where_fields, From c057a19e68857901e35f34344c1c4ba42d2c8023 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Fri, 13 Dec 2024 13:53:14 +0300 Subject: [PATCH 4/4] Cleanup --- src/workflows/airqo_etl_utils/daily_data_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/daily_data_utils.py b/src/workflows/airqo_etl_utils/daily_data_utils.py index f12ff79390..27ff183025 100644 --- a/src/workflows/airqo_etl_utils/daily_data_utils.py +++ b/src/workflows/airqo_etl_utils/daily_data_utils.py @@ -93,7 +93,7 @@ def cleanup_and_reload( ) bigquery_api.reload_data( - tenant=Tenant.ALL, + network="all", table=table, dataframe=data, start_date_time=start_date_time,