Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update metadata name(s) #4366

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/workflows/airqo_etl_utils/airqo_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ def get_networks(
return networks, exception_message

def get_devices_by_network(
self, device_network: str = None, device_category: DeviceCategory = None
self,
device_network: DeviceNetwork = None,
device_category: DeviceCategory = None,
) -> List[Dict[str, Any]]:
"""
Retrieve devices by network based on the specified device category.
Expand Down Expand Up @@ -288,15 +290,15 @@ def get_devices_by_network(
networks: List[str] = []
params: Dict = {}
if device_network:
networks.append({"net_name": device_network})
networks.append({"net_name": device_network.str})
else:
networks, error = self.get_networks()
if error:
logger.error(f"Error while fetching networks: {error}")
return devices

if device_category:
params["category"] = str(device_category)
params["category"] = device_category.str

if configuration.ENVIRONMENT == "production":
params["active"] = True
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def extract_mobile_low_cost_sensors_data(
end_date_time=value.get("end_date_time"),
device_numbers=[value.get("device_number")],
resolution=resolution,
device_category=DeviceCategory.LOW_COST,
device_category=DeviceCategory.LOWCOST,
)
if measurements.empty:
continue
Expand Down
10 changes: 5 additions & 5 deletions src/workflows/airqo_etl_utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ class DeviceCategory(Enum):
GENERAL -> All the sensors
"""

LOW_COST = 1
LOWCOST = 1
BAM = 2
LOW_COST_GAS = 3
GAS = 3
WEATHER = 4
GENERAL = 5
NONE = 20
Expand Down Expand Up @@ -49,9 +49,9 @@ def category_from_str(category: str):
category = category.lower()
if category == str(DeviceCategory.BAM).lower():
return DeviceCategory.BAM
elif category == str(DeviceCategory.LOW_COST_GAS).lower():
return DeviceCategory.LOW_COST_GAS
return DeviceCategory.LOW_COST
elif category == str(DeviceCategory.GAS).lower():
return DeviceCategory.GAS
return DeviceCategory.LOWCOST


class DeviceNetwork(Enum):
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/airqo_etl_utils/data_warehouse_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def extract_hourly_low_cost_data(
},
inplace=True,
)
data["device_category"] = str(DeviceCategory.LOW_COST)
data["device_category"] = str(DeviceCategory.LOWCOST)
return DataWarehouseUtils.filter_valid_columns(data)

@staticmethod
Expand Down Expand Up @@ -119,7 +119,7 @@ def merge_datasets(
low_cost_data: pd.DataFrame,
sites_info: pd.DataFrame,
) -> pd.DataFrame:
low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOW_COST)
low_cost_data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST)
bam_data.loc[:, "device_category"] = str(DeviceCategory.BAM)

airqo_data = low_cost_data.loc[low_cost_data["network"] == DeviceNetwork.AIRQO]
Expand Down
18 changes: 9 additions & 9 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def extract_devices_data(
)
)["device_number"] = devices["device_number"].fillna(-1)
except Exception as e:
logger.exception("No devices currently cached.")
logger.exception(f"No devices currently cached: {e}")
devices = pd.DataFrame()

keys = {}
Expand All @@ -71,7 +71,7 @@ def extract_devices_data(
if devices.empty:
logger.exception("Failed to download or fetch devices.")
raise RuntimeError("Failed to cached and api devices data.")

print(devices)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Replace print statement with proper logging.

Using print statements in production code is not recommended. Consider using the logger instead.

-        print(devices)
+        logger.debug(f"Loaded devices: {devices}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
print(devices)
logger.debug(f"Loaded devices: {devices}")

if not devices.empty and device_network:
devices = devices.loc[devices.network == device_network.str]

Expand Down Expand Up @@ -124,7 +124,7 @@ def load_cached_data(local_file_path: str, file_name: str) -> pd.DataFrame:
if not file.exists():
download_file_from_gcs(
bucket_name=Config.AIRFLOW_XCOM_BUCKET,
source_file=file_name,
source_file=file_name + ".csv",
destination_file=local_file_path,
)
data = pd.read_csv(local_file_path) if file.exists() else pd.DataFrame()
Expand All @@ -135,15 +135,15 @@ def load_cached_data(local_file_path: str, file_name: str) -> pd.DataFrame:
return pd.DataFrame()

def _fetch_devices_from_api(
device_network: DeviceNetwork, device_category: DeviceCategory
device_network: DeviceNetwork = None, device_category: DeviceCategory = None
) -> pd.DataFrame:
"""Fetch devices from the API if the cached file is empty."""
airqo_api = AirQoApi()
try:
devices = pd.DataFrame(
airqo_api.get_devices_by_network(
device_network=device_network.str,
device_category=device_category.str,
device_network=device_network,
device_category=device_category,
)
)
devices["device_number"] = devices["device_number"].fillna(-1)
Expand Down Expand Up @@ -662,11 +662,11 @@ def clean_low_cost_sensor_data(

# Perform data check here: TODO Find a more structured and robust way to implement raw data quality checks.
match device_category:
case DeviceCategory.LOW_COST_GAS:
case DeviceCategory.GAS:
AirQoGxExpectations.from_pandas().gaseous_low_cost_sensor_raw_data_check(
data
)
case DeviceCategory.LOW_COST:
case DeviceCategory.LOWCOST:
AirQoGxExpectations.from_pandas().pm2_5_low_cost_sensor_raw_data(data)
try:
data.dropna(subset=["timestamp"], inplace=True)
Expand All @@ -681,7 +681,7 @@ def clean_low_cost_sensor_data(
subset=["timestamp", "device_id"], keep="first", inplace=True
)
# TODO Find an appropriate place to put this
if device_category == DeviceCategory.LOW_COST:
if device_category == DeviceCategory.LOWCOST:
is_airqo_network = data["network"] == "airqo"

pm2_5_mean = data.loc[is_airqo_network, ["s1_pm2_5", "s2_pm2_5"]].mean(
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/kcca_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def transform_data(data: pd.DataFrame) -> pd.DataFrame:
@staticmethod
def process_latest_data(data: pd.DataFrame) -> pd.DataFrame:
data.loc[:, "network"] = str(DeviceNetwork.KCCA)
data.loc[:, "device_category"] = str(DeviceCategory.LOW_COST)
data.loc[:, "device_category"] = str(DeviceCategory.LOWCOST)
return data

@staticmethod
Expand Down
18 changes: 9 additions & 9 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,14 @@ def extract_raw_data(**kwargs):
return DataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.LOW_COST,
device_category=DeviceCategory.LOWCOST,
resolution=Frequency.DAILY,
)

@task()
def clean_data_raw_data(data: pd.DataFrame):
return DataUtils.clean_low_cost_sensor_data(
data=data, device_category=DeviceCategory.LOW_COST
data=data, device_category=DeviceCategory.LOWCOST
)

@task(retries=3, retry_delay=timedelta(minutes=5))
Expand Down Expand Up @@ -360,14 +360,14 @@ def extract_raw_data(**kwargs) -> pd.DataFrame:
return DataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.LOW_COST,
device_category=DeviceCategory.LOWCOST,
resolution=Frequency.RAW_LOW_COST,
)

@task()
def clean_data_raw_data(data: pd.DataFrame) -> pd.DataFrame:
return DataUtils.clean_low_cost_sensor_data(
data=data, device_category=DeviceCategory.LOW_COST
data=data, device_category=DeviceCategory.LOWCOST
)

@task()
Expand Down Expand Up @@ -486,7 +486,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
unique_str = str(now.date()) + "-" + str(now.hour)

data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.LOW_COST
data=data, device_category=DeviceCategory.LOWCOST
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
Expand Down Expand Up @@ -551,15 +551,15 @@ def extract_raw_data(**kwargs):
return DataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.LOW_COST,
device_category=DeviceCategory.LOWCOST,
)

@task(
doc_md=clean_data_raw_data_doc,
)
def clean_data_raw_data(data: pd.DataFrame):
return DataUtils.clean_low_cost_sensor_data(
data=data, device_category=DeviceCategory.LOW_COST
data=data, device_category=DeviceCategory.LOWCOST
)

@task(
Expand Down Expand Up @@ -610,7 +610,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame:
return DataUtils.extract_devices_data(
start_date_time=start_date_time,
end_date_time=end_date_time,
device_category=DeviceCategory.LOW_COST_GAS,
device_category=DeviceCategory.GAS,
device_network=DeviceNetwork.AIRQO,
)

Expand All @@ -619,7 +619,7 @@ def extract_raw_data(**kwargs) -> pd.DataFrame:
)
def clean_data_raw_data(data: pd.DataFrame):
return DataUtils.clean_low_cost_sensor_data(
data=data, device_category=DeviceCategory.LOW_COST_GAS
data=data, device_category=DeviceCategory.GAS
)

@task(
Expand Down
Loading