Skip to content

Commit

Permalink
Merge pull request #4062 from NicholasTurner23/update-fix/Clean_up
Browse files Browse the repository at this point in the history
Update fix/clean up
  • Loading branch information
Baalmart authored Dec 13, 2024
2 parents f0fb6ac + 688e416 commit 40bbbd5
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 30 deletions.
2 changes: 1 addition & 1 deletion src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:

data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if (data["pm2_5_raw_value"] == 0).all():
if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

zero_columns = data.loc[:, (data == 0).all()].columns
Expand Down
36 changes: 33 additions & 3 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,22 +604,52 @@ def reload_data(
self,
dataframe: pd.DataFrame,
table: str,
tenant: Tenant = Tenant.ALL,
network: str = "all",
start_date_time: str = None,
end_date_time: str = None,
where_fields: dict = None,
null_cols: list = None,
) -> None:
"""
Reloads data into a specified table in BigQuery by:
1. Deleting existing records in the table based on the provided date range,
network, and optional filtering criteria.
2. Inserting new records from the provided DataFrame.
Args:
dataframe (pd.DataFrame): The data to be reloaded into the table.
table (str): The target table in BigQuery.
network (str, optional): The network filter to be applied. Defaults to "all".
start_date_time (str, optional): The start of the date range for deletion.
If None, inferred from the DataFrame's earliest timestamp.
end_date_time (str, optional): The end of the date range for deletion.
If None, inferred from the DataFrame's latest timestamp.
where_fields (dict, optional): Additional fields and values for filtering rows to delete.
null_cols (list, optional): Columns to filter on `NULL` values during deletion.
Returns:
None: The function performs operations directly on the BigQuery table.
Raises:
ValueError: If `timestamp` column is missing in the DataFrame.
"""

if start_date_time is None or end_date_time is None:
data = dataframe.copy()
if "timestamp" not in dataframe.columns:
raise ValueError(
"The DataFrame must contain a 'timestamp' column to derive the date range."
)
data = (
dataframe.copy()
) # Not sure why this dataframe is being copied. # Memory wastage?
data["timestamp"] = pd.to_datetime(data["timestamp"])
start_date_time = date_to_str(data["timestamp"].min())
end_date_time = date_to_str(data["timestamp"].max())

query = self.compose_query(
QueryType.DELETE,
table=table,
tenant=tenant,
network=network,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
Expand Down
68 changes: 42 additions & 26 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,47 @@
class DailyDataUtils:
@staticmethod
def average_data(data: pd.DataFrame) -> pd.DataFrame:
averaged_data = pd.DataFrame()
data["timestamp"] = data["timestamp"].apply(pd.to_datetime)

for _, by_tenant in data.groupby("tenant"):
tenant = by_tenant.iloc[0]["tenant"]
del by_tenant["tenant"]
for _, by_device in by_tenant.groupby("device_id"):
site_id = by_device.iloc[0]["site_id"]
device_id = by_device.iloc[0]["device_id"]
device_number = by_device.iloc[0]["device_number"]

del by_device["site_id"]
del by_device["device_id"]
del by_device["device_number"]

device_averages = by_device.resample("1D", on="timestamp").mean()
device_averages["timestamp"] = device_averages.index
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number
device_averages["tenant"] = tenant

averaged_data = pd.concat(
[averaged_data, device_averages], ignore_index=True
)
"""
Averages data in a pandas DataFrame on a daily basis for each device,
grouped by network and device ID. The function resamples data
to compute daily averages for numerical columns.
Args:
data (pd.DataFrame): A pandas DataFrame containing the following columns:
- "timestamp": Timestamps of the data.
- "network": The network the data belongs to.
- "device_id": Unique identifier for the device.
- "site_id": Unique identifier for the site associated with the device.
- "device_number": Device number.
Returns:
pd.DataFrame: A DataFrame containing daily averages for each device,
including metadata columns such as "tenant", "device_id", "site_id",
and "device_number".
"""
data["timestamp"] = pd.to_datetime(data["timestamp"])

averaged_data_list = []

for (network, device_id), group in data.groupby(["network", "device_id"]):
network = group["network"].iloc[0]
site_id = group["site_id"].iloc[0]
device_number = group["device_number"].iloc[0]

device_averages = (
group.resample("1D", on="timestamp")
.mean(numeric_only=True)
.reset_index()
)

device_averages["network"] = network
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number

averaged_data_list.append(device_averages)

averaged_data = pd.concat(averaged_data_list, ignore_index=True)

return averaged_data

Expand Down Expand Up @@ -77,7 +93,7 @@ def cleanup_and_reload(
)

bigquery_api.reload_data(
tenant=Tenant.ALL,
network="all",
table=table,
dataframe=data,
start_date_time=start_date_time,
Expand Down

0 comments on commit 40bbbd5

Please sign in to comment.