Skip to content

Commit

Permalink
Merge pull request #4412 from NicholasTurner23/update/calculate_hourl…
Browse files Browse the repository at this point in the history
…y_airqualitydata_using_bigqdata

Update/calculate hourly airqualitydata using bigqdata
  • Loading branch information
Baalmart authored Feb 11, 2025
2 parents 878c3f6 + da1ef6c commit 39f00e8
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 13 deletions.
84 changes: 73 additions & 11 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@

from .airqo_utils import AirQoDataUtils

# from .app_insights_utils import AirQoAppUtils
from .bigquery_api import BigQueryApi
from .constants import DeviceCategory, DeviceNetwork
from .data_validator import DataValidationUtils
from .weather_data_utils import WeatherDataUtils
from .constants import DataType, Frequency
from .datautils import DataUtils

from typing import Set
from typing import Set, Optional


class DataWarehouseUtils:
Expand All @@ -24,7 +22,7 @@ def filter_valid_columns(data: pd.DataFrame) -> pd.DataFrame:
new DataFrame containing only the columns that are common to both.
Args:
data (pd.DataFrame): The input DataFrame containing data to be filtered.
data(pd.DataFrame): The input DataFrame containing data to be filtered.
Returns:
pd.DataFrame: A new DataFrame containing only the valid columns that exist in the BigQuery data warehouse table.
Expand All @@ -42,7 +40,16 @@ def extract_hourly_bam_data(
start_date_time: str,
end_date_time: str,
) -> pd.DataFrame:
"""
Extracts hourly BAM (Beta Attenuation Monitor) data from BigQuery.
Args:
start_date_time(str): The start timestamp for data extraction in ISO format.
end_date_time(str): The end timestamp for data extraction in ISO format.
Returns:
pd.DataFrame: A DataFrame containing the extracted and processed BAM data.
"""
data = DataUtils.extract_data_from_bigquery(
DataType.AVERAGED,
start_date_time=start_date_time,
Expand All @@ -58,14 +65,24 @@ def extract_hourly_bam_data(
},
inplace=True,
)
data["device_category"] = str(DeviceCategory.BAM)
data["device_category"] = DeviceCategory.BAM.str
return DataWarehouseUtils.filter_valid_columns(data)

@staticmethod
def extract_hourly_low_cost_data(
start_date_time: str,
end_date_time: str,
) -> pd.DataFrame:
"""
Extracts hourly averaged data for low-cost devices from BigQuery.
Args:
start_date_time (str): The start timestamp for data extraction in ISO format.
end_date_time (str): The end timestamp for data extraction in ISO format.
Returns:
pd.DataFrame: A DataFrame containing the extracted and processed low-cost device data.
"""
data = DataUtils.extract_data_from_bigquery(
DataType.AVERAGED,
start_date_time=start_date_time,
Expand All @@ -82,13 +99,23 @@ def extract_hourly_low_cost_data(
},
inplace=True,
)
data["device_category"] = str(DeviceCategory.LOWCOST)
data["device_category"] = DeviceCategory.LOWCOST.str
return DataWarehouseUtils.filter_valid_columns(data)

@staticmethod
def extract_hourly_weather_data(
start_date_time: str, end_date_time: str
) -> pd.DataFrame:
"""
Extracts hourly weather data from the weather data source.
Args:
start_date_time (str): The start timestamp for data extraction in ISO format.
end_date_time (str): The end timestamp for data extraction in ISO format.
Returns:
pd.DataFrame: A DataFrame containing the extracted hourly weather data.
"""
return WeatherDataUtils.extract_weather_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
Expand All @@ -97,7 +124,23 @@ def extract_hourly_weather_data(
)

@staticmethod
def extract_sites_meta_data(network: DeviceNetwork = None) -> pd.DataFrame:
def extract_sites_meta_data(
network: Optional[DeviceNetwork] = None,
) -> pd.DataFrame:
"""
Extracts site metadata from the data source.
This function retrieves metadata for sites within a specified device network
(if provided), renames columns for consistency, and filters the resulting
DataFrame to retain only valid columns.
Args:
network (DeviceNetwork, optional): The device network to filter sites by.
Defaults to None, meaning all sites are retrieved.
Returns:
pd.DataFrame: A DataFrame containing site metadata with standardized column names.
"""
sites = DataUtils.get_sites(network=network)
sites.rename(
columns={
Expand Down Expand Up @@ -130,13 +173,32 @@ def merge_datasets(
low_cost_data: pd.DataFrame,
sites_info: pd.DataFrame,
) -> pd.DataFrame:
low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST)
bam_data.loc[:, "device_category"] = str(DeviceCategory.BAM)
"""
Merges weather, BAM, low-cost sensor data, and site metadata into a unified dataset.
airqo_data = low_cost_data.loc[low_cost_data["network"] == DeviceNetwork.AIRQO]
This function standardizes device category labels, separates AirQo and non-AirQo
data, merges AirQo data with weather data, and finally combines all datasets into
a single DataFrame with site metadata.
Args:
weather_data(pd.DataFrame): The weather dataset containing hourly measurements.
bam_data(pd.DataFrame): The BAM sensor dataset with air quality measurements.
low_cost_data(pd.DataFrame): The low-cost sensor dataset with air quality measurements.
sites_info(pd.DataFrame): Site metadata, including location and additional attributes.
Returns:
pd.DataFrame: A merged dataset containing device data (weather, BAM, and low-cost sensors) along with site information.
"""

low_cost_data.loc[:, "device_category"] = DeviceCategory.LOWCOST.str
bam_data.loc[:, "device_category"] = DeviceCategory.BAM.str

airqo_data = low_cost_data.loc[
low_cost_data["network"] == DeviceNetwork.AIRQO.str
]

non_airqo_data = low_cost_data.loc[
low_cost_data["network"] != DeviceNetwork.AIRQO
low_cost_data["network"] != DeviceNetwork.AIRQO.str
]
airqo_data = AirQoDataUtils.merge_aggregated_weather_data(
airqo_data=airqo_data, weather_data=weather_data
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame:

# Only used the first time
start = kwargs.get("params", {}).get("start_date", "2021-01-01T00:00:00Z")
start = datetime.strptime(start, "%%Y-%m-%dT%H:%M:%SZ")
start = datetime.strptime(start, "%Y-%m-%dT%H:%M:%SZ")
end = kwargs.get("params", {}).get("end_date", "2021-12-31T23:59:59Z")
end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ")

Expand All @@ -668,7 +668,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame:
previous_date = start

hour_of_day = (
datetime.strptime(previous_date, "%%Y-%m-%dT%H:%M:%SZ")
datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ")
if not isinstance(previous_date, datetime)
else previous_date
)
Expand Down

0 comments on commit 39f00e8

Please sign in to comment.