diff --git a/src/workflows/airqo_etl_utils/data_warehouse_utils.py b/src/workflows/airqo_etl_utils/data_warehouse_utils.py index 50b2e4a8cf..8d092d3c0e 100644 --- a/src/workflows/airqo_etl_utils/data_warehouse_utils.py +++ b/src/workflows/airqo_etl_utils/data_warehouse_utils.py @@ -2,15 +2,13 @@ from .airqo_utils import AirQoDataUtils -# from .app_insights_utils import AirQoAppUtils from .bigquery_api import BigQueryApi from .constants import DeviceCategory, DeviceNetwork -from .data_validator import DataValidationUtils from .weather_data_utils import WeatherDataUtils from .constants import DataType, Frequency from .datautils import DataUtils -from typing import Set +from typing import Set, Optional class DataWarehouseUtils: @@ -24,7 +22,7 @@ def filter_valid_columns(data: pd.DataFrame) -> pd.DataFrame: new DataFrame containing only the columns that are common to both. Args: - data (pd.DataFrame): The input DataFrame containing data to be filtered. + data(pd.DataFrame): The input DataFrame containing data to be filtered. Returns: pd.DataFrame: A new DataFrame containing only the valid columns that exist in the BigQuery data warehouse table. @@ -42,7 +40,16 @@ def extract_hourly_bam_data( start_date_time: str, end_date_time: str, ) -> pd.DataFrame: + """ + Extracts hourly BAM (Beta Attenuation Monitor) data from BigQuery. + + Args: + start_date_time(str): The start timestamp for data extraction in ISO format. + end_date_time(str): The end timestamp for data extraction in ISO format. + Returns: + pd.DataFrame: A DataFrame containing the extracted and processed BAM data. + """ data = DataUtils.extract_data_from_bigquery( DataType.AVERAGED, start_date_time=start_date_time, @@ -58,7 +65,7 @@ def extract_hourly_bam_data( }, inplace=True, ) - data["device_category"] = str(DeviceCategory.BAM) + data["device_category"] = DeviceCategory.BAM.str return DataWarehouseUtils.filter_valid_columns(data) @staticmethod @@ -66,6 +73,16 @@ def extract_hourly_low_cost_data( start_date_time: str, end_date_time: str, ) -> pd.DataFrame: + """ + Extracts hourly averaged data for low-cost devices from BigQuery. + + Args: + start_date_time (str): The start timestamp for data extraction in ISO format. + end_date_time (str): The end timestamp for data extraction in ISO format. + + Returns: + pd.DataFrame: A DataFrame containing the extracted and processed low-cost device data. + """ data = DataUtils.extract_data_from_bigquery( DataType.AVERAGED, start_date_time=start_date_time, @@ -82,13 +99,23 @@ def extract_hourly_low_cost_data( }, inplace=True, ) - data["device_category"] = str(DeviceCategory.LOWCOST) + data["device_category"] = DeviceCategory.LOWCOST.str return DataWarehouseUtils.filter_valid_columns(data) @staticmethod def extract_hourly_weather_data( start_date_time: str, end_date_time: str ) -> pd.DataFrame: + """ + Extracts hourly weather data from the weather data source. + + Args: + start_date_time (str): The start timestamp for data extraction in ISO format. + end_date_time (str): The end timestamp for data extraction in ISO format. + + Returns: + pd.DataFrame: A DataFrame containing the extracted hourly weather data. + """ return WeatherDataUtils.extract_weather_data( start_date_time=start_date_time, end_date_time=end_date_time, @@ -97,7 +124,23 @@ def extract_hourly_weather_data( ) @staticmethod - def extract_sites_meta_data(network: DeviceNetwork = None) -> pd.DataFrame: + def extract_sites_meta_data( + network: Optional[DeviceNetwork] = None, + ) -> pd.DataFrame: + """ + Extracts site metadata from the data source. + + This function retrieves metadata for sites within a specified device network + (if provided), renames columns for consistency, and filters the resulting + DataFrame to retain only valid columns. + + Args: + network (DeviceNetwork, optional): The device network to filter sites by. + Defaults to None, meaning all sites are retrieved. + + Returns: + pd.DataFrame: A DataFrame containing site metadata with standardized column names. + """ sites = DataUtils.get_sites(network=network) sites.rename( columns={ @@ -130,13 +173,32 @@ def merge_datasets( low_cost_data: pd.DataFrame, sites_info: pd.DataFrame, ) -> pd.DataFrame: - low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST) - bam_data.loc[:, "device_category"] = str(DeviceCategory.BAM) + """ + Merges weather, BAM, low-cost sensor data, and site metadata into a unified dataset. - airqo_data = low_cost_data.loc[low_cost_data["network"] == DeviceNetwork.AIRQO] + This function standardizes device category labels, separates AirQo and non-AirQo + data, merges AirQo data with weather data, and finally combines all datasets into + a single DataFrame with site metadata. + + Args: + weather_data(pd.DataFrame): The weather dataset containing hourly measurements. + bam_data(pd.DataFrame): The BAM sensor dataset with air quality measurements. + low_cost_data(pd.DataFrame): The low-cost sensor dataset with air quality measurements. + sites_info(pd.DataFrame): Site metadata, including location and additional attributes. + + Returns: + pd.DataFrame: A merged dataset containing device data (weather, BAM, and low-cost sensors) along with site information. + """ + + low_cost_data.loc[:, "device_category"] = DeviceCategory.LOWCOST.str + bam_data.loc[:, "device_category"] = DeviceCategory.BAM.str + + airqo_data = low_cost_data.loc[ + low_cost_data["network"] == DeviceNetwork.AIRQO.str + ] non_airqo_data = low_cost_data.loc[ - low_cost_data["network"] != DeviceNetwork.AIRQO + low_cost_data["network"] != DeviceNetwork.AIRQO.str ] airqo_data = AirQoDataUtils.merge_aggregated_weather_data( airqo_data=airqo_data, weather_data=weather_data diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 7c895c8ac2..b21c5304e3 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -659,7 +659,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: # Only used the first time start = kwargs.get("params", {}).get("start_date", "2021-01-01T00:00:00Z") - start = datetime.strptime(start, "%%Y-%m-%dT%H:%M:%SZ") + start = datetime.strptime(start, "%Y-%m-%dT%H:%M:%SZ") end = kwargs.get("params", {}).get("end_date", "2021-12-31T23:59:59Z") end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ") @@ -668,7 +668,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: previous_date = start hour_of_day = ( - datetime.strptime(previous_date, "%%Y-%m-%dT%H:%M:%SZ") + datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ") if not isinstance(previous_date, datetime) else previous_date )