Skip to content

Commit

Permalink
Merge pull request #4534 from NicholasTurner23/update/calculate_hourl…
Browse files Browse the repository at this point in the history
…y_airqualitydata_using_bigqdata

Update/calculate hourly airqualitydata using bigqdata
  • Loading branch information
Baalmart authored Mar 7, 2025
2 parents 8e0a0ee + 146be9c commit 2116d62
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 57 deletions.
55 changes: 30 additions & 25 deletions src/workflows/airqo_etl_utils/airnow_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,46 @@
import pandas as pd

from .airnow_api import AirNowApi
from .airqo_api import AirQoApi
from .constants import DataSource, DeviceCategory, Frequency, DeviceNetwork
from .data_validator import DataValidationUtils
from .date import str_to_date, date_to_str
from .utils import Utils
from .data_validator import DataValidationUtils
from .datautils import DataUtils
from .config import configuration
from .config import configuration as Config
import logging

logger = logging.getLogger(__name__)


class AirnowDataUtils:
@staticmethod
def parameter_column_name(parameter: str) -> str:
parameter = parameter.lower()
if parameter == "pm2.5":
return "pm2_5"
elif parameter == "pm10":
return "pm10"
elif parameter == "no2":
return "no2"
else:
raise Exception(f"Unknown parameter {parameter}")

@staticmethod
def query_bam_data(
api_key: str, start_date_time: str, end_date_time: str
) -> pd.DataFrame:
"""
Queries BAM (Beta Attenuation Monitor) data from the AirNow API within the given date range.
This function converts the input date strings into the required format for the API,
retrieves air quality data, and returns it as a Pandas DataFrame.
Args:
api_key(str): The API key required for authentication with AirNow.
start_date_time(str): The start datetime in string format (expected format: "YYYY-MM-DD HH:MM").
end_date_time(str): The end datetime in string format (expected format: "YYYY-MM-DD HH:MM").
Returns:
pd.DataFrame: A DataFrame containing the air quality data retrieved from the AirNow API.
Example:
>>> df = query_bam_data("your_api_key", "2024-03-01 00:00", "2024-03-02 23:59")
>>> print(df.head())
"""
airnow_api = AirNowApi()
date_format = "%Y-%m-%dT%H:%M"
start_date_time = date_to_str(
str_to_date(start_date_time), str_format="%Y-%m-%dT%H:%M"
)
end_date_time = date_to_str(
str_to_date(end_date_time), str_format="%Y-%m-%dT%H:%M"
str_to_date(start_date_time), str_format=date_format
)
end_date_time = date_to_str(str_to_date(end_date_time), str_format=date_format)

data = airnow_api.get_data(
api_key=api_key,
Expand All @@ -46,7 +50,7 @@ def query_bam_data(
end_date_time=end_date_time,
)

return pd.DataFrame(data)
return pd.DataFrame(data) if data else pd.DataFrame()

@staticmethod
def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
Expand Down Expand Up @@ -78,7 +82,7 @@ def extract_bam_data(start_date_time: str, end_date_time: str) -> pd.DataFrame:
if not dates:
raise ValueError("Invalid or empty date range provided.")

api_key = configuration.US_EMBASSY_API_KEY
api_key = Config.US_EMBASSY_API_KEY

all_device_data = []
device_data = []
Expand Down Expand Up @@ -141,10 +145,11 @@ def process_bam_data(data: pd.DataFrame) -> pd.DataFrame:
logger.exception(f"Device with ID {device_id_} not found")
continue

parameter_col_name = AirnowDataUtils.parameter_column_name(
row["Parameter"]
)
if parameter_col_name in pollutant_value:
parameter_col_name = Config.device_config_mapping.get(
DeviceCategory.BAM.str, {}
).get(row["Parameter"].lower(), None)

if parameter_col_name and parameter_col_name in pollutant_value:
pollutant_value[parameter_col_name] = row["Value"]

if row["network"] != device_details.get("network"):
Expand Down
15 changes: 8 additions & 7 deletions src/workflows/airqo_etl_utils/airqo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import ast
import numpy as np
import pandas as pd
from typing import List, Dict, Any, Union, Generator
from typing import List, Dict, Any, Union, Generator, Optional

from .airqo_api import AirQoApi
from .bigquery_api import BigQueryApi
Expand Down Expand Up @@ -738,15 +738,17 @@ def _airqo_calibrate(data: pd.DataFrame, groupby: str) -> pd.DataFrame:

@staticmethod
def extract_devices_with_uncalibrated_data(
start_date, table: str = None, network: DeviceNetwork = DeviceNetwork.AIRQO
start_date: str,
table: Optional[str] = None,
network: Optional[DeviceNetwork] = DeviceNetwork.AIRQO,
) -> pd.DataFrame:
"""
Extracts devices with uncalibrated data for a given start date from BigQuery.
Args:
start_date (str or datetime): The date for which to check missing uncalibrated data.
table (str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically.
network (DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO.
start_date(datetime like string): The date for which to check missing uncalibrated data.
table(str, optional): The name of the BigQuery table. Defaults to None, in which case the appropriate table is determined dynamically.
network(DeviceNetwork, optional): The device network to filter by. Defaults to DeviceNetwork.AIRQO.
Returns:
pd.DataFrame: A DataFrame containing the devices with missing uncalibrated data.
Expand Down Expand Up @@ -788,9 +790,8 @@ def extract_aggregate_calibrate_raw_data(

# TODO Might have to change approach to group by device_id depending on performance.
for _, row in devices.iterrows():
end_date_time = datetime.strptime(row.timestamp, "%Y-%m-%d %H:%M:%S%z")
end_date_time = DateUtils.format_datetime_by_unit_str(
end_date_time, "hours_end"
row.timestamp, "hours_end"
)
raw_device_data = DataUtils.extract_data_from_bigquery(
DataType.RAW,
Expand Down
11 changes: 9 additions & 2 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,10 @@ def fetch_satellite_readings(
logger.info(f"Error fetching data from bigquery", {e})

def generate_missing_data_query(
self, date: str, table: str, network: DeviceNetwork
self,
date: str,
table: str,
network: Optional[DeviceNetwork] = DeviceNetwork.AIRQO,
) -> str:
"""
Generates a BigQuery SQL query to find missing hourly air quality data for devices.
Expand All @@ -1071,7 +1074,11 @@ def generate_missing_data_query(
SELECT device_id, TIMESTAMP_TRUNC(timestamp, HOUR) AS timestamp
FROM `{table}`
WHERE
DATE(timestamp) = '{date}'
TIMESTAMP_TRUNC(timestamp, DAY) = '{date}'
AND s1_pm2_5 IS NOT NULL
AND s2_pm2_5 IS NOT NULL
AND s1_pm10 IS NOT NULL
AND s2_pm10 IS NOT NULL
AND pm2_5_calibrated_value IS NULL
AND network = '{network.str}'
)
Expand Down
7 changes: 6 additions & 1 deletion src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ class Config:
"ts": "timestamp",
}

AIRBEAM_BAM_FIELD_MAPPING = {"pm2.5": "pm2_5", "pm10": "pm10", "no2": "no2"}

DATA_RESOLUTION_MAPPING = {
"iqair": {"hourly": "instant", "raw": "instant", "current": "current"}
}
Expand Down Expand Up @@ -363,7 +365,10 @@ class Config:
device_config_mapping = {
"bam": {
"field_8_cols": list(AIRQO_BAM_MAPPING_NEW.get("field8", {}).values()),
"mapping": {"airqo": AIRQO_BAM_MAPPING_NEW},
"mapping": {
"airqo": AIRQO_BAM_MAPPING_NEW,
"airbeam": AIRBEAM_BAM_FIELD_MAPPING,
},
"other_fields_cols": [],
},
"gas": {
Expand Down
26 changes: 26 additions & 0 deletions src/workflows/airqo_etl_utils/data_summary_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,32 @@
class DataSummaryUtils:
@staticmethod
def compute_devices_summary(data: pd.DataFrame) -> pd.DataFrame:
"""
Computes a summary of device records from the given dataset.
This function processes a DataFrame containing device readings, grouping data
by device and daily timestamps. It calculates the number of hourly records,
calibrated and uncalibrated records, and their respective percentages.
Args:
data (pd.DataFrame): A DataFrame containing device data with at least the columns:
- timestamp(datetime-like string): The time of the record.
- device(str): The device identifier.
- site_id(str): The site identifier.
- pm2_5_calibrated_value(float): The calibrated PM2.5 value.
Returns:
pd.DataFrame: A summary DataFrame with aggregated daily records for each device,
containing the following columns:
- timestamp(str): Date (YYYY-MM-DD) representing the aggregation period.
- device(str): Device identifier.
- site_id(str): Site identifier.
- hourly_records(int): Total records for that device on that date.
- calibrated_records(int): Count of non-null calibrated PM2.5 values.
- uncalibrated_records(int): Count of missing PM2.5 calibrated values.
- calibrated_percentage(float): Percentage of calibrated records.
- uncalibrated_percentage(float): Percentage of uncalibrated records.
"""
devices_summary = pd.DataFrame()
data["timestamp"] = pd.to_datetime(data["timestamp"])
data.drop_duplicates(subset=["device", "timestamp"], inplace=True)
Expand Down
37 changes: 16 additions & 21 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pandas as pd
from pathlib import Path
import json
import ast
from confluent_kafka import KafkaException
from typing import List, Dict, Any, Union, Tuple, Optional

Expand Down Expand Up @@ -1036,32 +1037,26 @@ def process_data_for_message_broker(
# Clarity
def _flatten_location_coordinates_clarity(coordinates: str) -> pd.Series:
"""
Extracts latitude and longitude from a coordinate string.
The function expects a string representation of coordinates in the format "[longitude, latitude]". It removes square brackets and spaces, splits
the values, and returns them as a Pandas Series.
Extracts latitude and longitude from a string representation of coordinates.
Args:
coordinates(str): A string containing coordinates in the format "[longitude, latitude]".
coordinates(str): A string containing a list or tuple with two numeric values representing latitude and longitude (e.g., "[37.7749, -122.4194]").
Returns:
pd.Series: A Pandas Series with 'latitude' and 'longitude' as keys. Returns None for both values if an error occurs.
Example:
>>> _flatten_location_coordinates("[-73.935242, 40.730610]")
latitude 40.730610
longitude -73.935242
dtype: object
pd.Series: A Pandas Series containing two values:
- latitude (float) at index 0
- longitude (float) at index 1
If parsing fails or the format is invalid, returns Series([None, None]).
"""

try:
coords = coordinates.strip("[] ").split(",")
return pd.Series(
{"latitude": coords[1].strip(), "longitude": coords[0].strip()}
)
except Exception as ex:
logger.exception("Error parsing coordinates: %s", ex)
return pd.Series({"latitude": None, "longitude": None})
coords = ast.literal_eval(coordinates)

if isinstance(coords, (list, tuple)) and len(coords) == 2:
return pd.Series(coords)
except (ValueError, SyntaxError):
logger.exception("Error occurred while cleaning up coordinates")

return pd.Series([None, None])

def _transform_clarity_data(data: pd.DataFrame) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -1106,7 +1101,7 @@ def _transform_clarity_data(data: pd.DataFrame) -> pd.DataFrame:
)

data[["latitude", "longitude"]] = data["location.coordinates"].apply(
DataUtils._flatten_location_coordinates
DataUtils._flatten_location_coordinates_clarity
)

devices, _ = DataUtils.get_devices()
Expand Down
4 changes: 3 additions & 1 deletion src/workflows/airqo_etl_utils/weather_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ def fetch_openweathermap_data_for_sites(sites: pd.DataFrame) -> pd.DataFrame:
for multiple sites in parallel batches.
"""

def process_batch(batch_of_coordinates: List[Dict[str, Any]]):
def process_batch(
batch_of_coordinates: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
Fetches weather data from OpenWeatherMap API for a given list of sites.
Expand Down
1 change: 1 addition & 0 deletions src/workflows/dags/data_summary.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,4 @@ def save_summary(data: pd.DataFrame):


# data_summary()
# TODO This is not being used. Will be deleted with all it's utilities once a data health analytics dashboard is in place.

0 comments on commit 2116d62

Please sign in to comment.