From b1bb106098b4290ed975aff27c70a2d58b0b2bf7 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Wed, 5 Feb 2025 19:15:49 +0300 Subject: [PATCH 1/2] Update metadata name(s) --- src/workflows/airqo_etl_utils/airqo_api.py | 8 +++++--- src/workflows/airqo_etl_utils/airqo_utils.py | 2 +- src/workflows/airqo_etl_utils/constants.py | 10 +++++----- .../airqo_etl_utils/data_warehouse_utils.py | 4 ++-- src/workflows/airqo_etl_utils/datautils.py | 18 +++++++++--------- src/workflows/airqo_etl_utils/kcca_utils.py | 2 +- src/workflows/dags/airqo_measurements.py | 18 +++++++++--------- 7 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_api.py b/src/workflows/airqo_etl_utils/airqo_api.py index 56958d99b7..9389b1e45c 100644 --- a/src/workflows/airqo_etl_utils/airqo_api.py +++ b/src/workflows/airqo_etl_utils/airqo_api.py @@ -240,7 +240,9 @@ def get_networks( return networks, exception_message def get_devices_by_network( - self, device_network: str = None, device_category: DeviceCategory = None + self, + device_network: DeviceNetwork = None, + device_category: DeviceCategory = None, ) -> List[Dict[str, Any]]: """ Retrieve devices by network based on the specified device category. @@ -288,7 +290,7 @@ def get_devices_by_network( networks: List[str] = [] params: Dict = {} if device_network: - networks.append({"net_name": device_network}) + networks.append({"net_name": device_network.str}) else: networks, error = self.get_networks() if error: @@ -296,7 +298,7 @@ def get_devices_by_network( return devices if device_category: - params["category"] = str(device_category) + params["category"] = device_category.str if configuration.ENVIRONMENT == "production": params["active"] = True diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index 831cf93493..6f51e267e4 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -156,7 +156,7 @@ def extract_mobile_low_cost_sensors_data( end_date_time=value.get("end_date_time"), device_numbers=[value.get("device_number")], resolution=resolution, - device_category=DeviceCategory.LOW_COST, + device_category=DeviceCategory.LOWCOST, ) if measurements.empty: continue diff --git a/src/workflows/airqo_etl_utils/constants.py b/src/workflows/airqo_etl_utils/constants.py index be2e8c5c66..ea3b553109 100644 --- a/src/workflows/airqo_etl_utils/constants.py +++ b/src/workflows/airqo_etl_utils/constants.py @@ -11,9 +11,9 @@ class DeviceCategory(Enum): GENERAL -> All the sensors """ - LOW_COST = 1 + LOWCOST = 1 BAM = 2 - LOW_COST_GAS = 3 + GAS = 3 WEATHER = 4 GENERAL = 5 NONE = 20 @@ -49,9 +49,9 @@ def category_from_str(category: str): category = category.lower() if category == str(DeviceCategory.BAM).lower(): return DeviceCategory.BAM - elif category == str(DeviceCategory.LOW_COST_GAS).lower(): - return DeviceCategory.LOW_COST_GAS - return DeviceCategory.LOW_COST + elif category == str(DeviceCategory.GAS).lower(): + return DeviceCategory.GAS + return DeviceCategory.LOWCOST class DeviceNetwork(Enum): diff --git a/src/workflows/airqo_etl_utils/data_warehouse_utils.py b/src/workflows/airqo_etl_utils/data_warehouse_utils.py index cf36650a99..5529f547c9 100644 --- a/src/workflows/airqo_etl_utils/data_warehouse_utils.py +++ b/src/workflows/airqo_etl_utils/data_warehouse_utils.py @@ -69,7 +69,7 @@ def extract_hourly_low_cost_data( }, inplace=True, ) - data["device_category"] = str(DeviceCategory.LOW_COST) + data["device_category"] = str(DeviceCategory.LOWCOST) return DataWarehouseUtils.filter_valid_columns(data) @staticmethod @@ -119,7 +119,7 @@ def merge_datasets( low_cost_data: pd.DataFrame, sites_info: pd.DataFrame, ) -> pd.DataFrame: - low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOW_COST) + low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST) bam_data.loc[:, "device_category"] = str(DeviceCategory.BAM) airqo_data = low_cost_data.loc[low_cost_data["network"] == DeviceNetwork.AIRQO] diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index ba9b8a5e5e..ea983c9c70 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -59,7 +59,7 @@ def extract_devices_data( ) )["device_number"] = devices["device_number"].fillna(-1) except Exception as e: - logger.exception("No devices currently cached.") + logger.exception(f"No devices currently cached: {e}") devices = pd.DataFrame() keys = {} @@ -71,7 +71,7 @@ def extract_devices_data( if devices.empty: logger.exception("Failed to download or fetch devices.") raise RuntimeError("Failed to cached and api devices data.") - + print(devices) if not devices.empty and device_network: devices = devices.loc[devices.network == device_network.str] @@ -124,7 +124,7 @@ def load_cached_data(local_file_path: str, file_name: str) -> pd.DataFrame: if not file.exists(): download_file_from_gcs( bucket_name=Config.AIRFLOW_XCOM_BUCKET, - source_file=file_name, + source_file=file_name + ".csv", destination_file=local_file_path, ) data = pd.read_csv(local_file_path) if file.exists() else pd.DataFrame() @@ -135,15 +135,15 @@ def load_cached_data(local_file_path: str, file_name: str) -> pd.DataFrame: return pd.DataFrame() def _fetch_devices_from_api( - device_network: DeviceNetwork, device_category: DeviceCategory + device_network: DeviceNetwork = None, device_category: DeviceCategory = None ) -> pd.DataFrame: """Fetch devices from the API if the cached file is empty.""" airqo_api = AirQoApi() try: devices = pd.DataFrame( airqo_api.get_devices_by_network( - device_network=device_network.str, - device_category=device_category.str, + device_network=device_network, + device_category=device_category, ) ) devices["device_number"] = devices["device_number"].fillna(-1) @@ -662,11 +662,11 @@ def clean_low_cost_sensor_data( # Perform data check here: TODO Find a more structured and robust way to implement raw data quality checks. match device_category: - case DeviceCategory.LOW_COST_GAS: + case DeviceCategory.GAS: AirQoGxExpectations.from_pandas().gaseous_low_cost_sensor_raw_data_check( data ) - case DeviceCategory.LOW_COST: + case DeviceCategory.LOWCOST: AirQoGxExpectations.from_pandas().pm2_5_low_cost_sensor_raw_data(data) try: data.dropna(subset=["timestamp"], inplace=True) @@ -681,7 +681,7 @@ def clean_low_cost_sensor_data( subset=["timestamp", "device_id"], keep="first", inplace=True ) # TODO Find an appropriate place to put this - if device_category == DeviceCategory.LOW_COST: + if device_category == DeviceCategory.LOWCOST: is_airqo_network = data["network"] == "airqo" pm2_5_mean = data.loc[is_airqo_network, ["s1_pm2_5", "s2_pm2_5"]].mean( diff --git a/src/workflows/airqo_etl_utils/kcca_utils.py b/src/workflows/airqo_etl_utils/kcca_utils.py index 3e91cb10c1..8a36363ee5 100644 --- a/src/workflows/airqo_etl_utils/kcca_utils.py +++ b/src/workflows/airqo_etl_utils/kcca_utils.py @@ -132,7 +132,7 @@ def transform_data(data: pd.DataFrame) -> pd.DataFrame: @staticmethod def process_latest_data(data: pd.DataFrame) -> pd.DataFrame: data.loc[:, "network"] = str(DeviceNetwork.KCCA) - data.loc[:, "device_category"] = str(DeviceCategory.LOW_COST) + data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST) return data @staticmethod diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 2891abe609..babbd43daf 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -184,14 +184,14 @@ def extract_raw_data(**kwargs): return DataUtils.extract_devices_data( start_date_time=start_date_time, end_date_time=end_date_time, - device_category=DeviceCategory.LOW_COST, + device_category=DeviceCategory.LOWCOST, resolution=Frequency.DAILY, ) @task() def clean_data_raw_data(data: pd.DataFrame): return DataUtils.clean_low_cost_sensor_data( - data=data, device_category=DeviceCategory.LOW_COST + data=data, device_category=DeviceCategory.LOWCOST ) @task(retries=3, retry_delay=timedelta(minutes=5)) @@ -360,14 +360,14 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: return DataUtils.extract_devices_data( start_date_time=start_date_time, end_date_time=end_date_time, - device_category=DeviceCategory.LOW_COST, + device_category=DeviceCategory.LOWCOST, resolution=Frequency.RAW_LOW_COST, ) @task() def clean_data_raw_data(data: pd.DataFrame) -> pd.DataFrame: return DataUtils.clean_low_cost_sensor_data( - data=data, device_category=DeviceCategory.LOW_COST + data=data, device_category=DeviceCategory.LOWCOST ) @task() @@ -486,7 +486,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs): unique_str = str(now.date()) + "-" + str(now.hour) data = AirQoDataUtils.process_latest_data( - data=data, device_category=DeviceCategory.LOW_COST + data=data, device_category=DeviceCategory.LOWCOST ) data = DataValidationUtils.process_data_for_message_broker( data=data, @@ -551,7 +551,7 @@ def extract_raw_data(**kwargs): return DataUtils.extract_devices_data( start_date_time=start_date_time, end_date_time=end_date_time, - device_category=DeviceCategory.LOW_COST, + device_category=DeviceCategory.LOWCOST, ) @task( @@ -559,7 +559,7 @@ def extract_raw_data(**kwargs): ) def clean_data_raw_data(data: pd.DataFrame): return DataUtils.clean_low_cost_sensor_data( - data=data, device_category=DeviceCategory.LOW_COST + data=data, device_category=DeviceCategory.LOWCOST ) @task( @@ -610,7 +610,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: return DataUtils.extract_devices_data( start_date_time=start_date_time, end_date_time=end_date_time, - device_category=DeviceCategory.LOW_COST_GAS, + device_category=DeviceCategory.GAS, device_network=DeviceNetwork.AIRQO, ) @@ -619,7 +619,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame: ) def clean_data_raw_data(data: pd.DataFrame): return DataUtils.clean_low_cost_sensor_data( - data=data, device_category=DeviceCategory.LOW_COST_GAS + data=data, device_category=DeviceCategory.GAS ) @task( From 187e37ab26972720ef2f535f0cb73a5bd1c0d192 Mon Sep 17 00:00:00 2001 From: Nicholas Bob Date: Wed, 5 Feb 2025 19:22:10 +0300 Subject: [PATCH 2/2] Update datautils.py Clean up --- src/workflows/airqo_etl_utils/datautils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index ea983c9c70..49267825b9 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -71,7 +71,7 @@ def extract_devices_data( if devices.empty: logger.exception("Failed to download or fetch devices.") raise RuntimeError("Failed to cached and api devices data.") - print(devices) + if not devices.empty and device_network: devices = devices.loc[devices.network == device_network.str]