Skip to content

Commit

Permalink
Merge pull request #4484 from NicholasTurner23/update/calculate_hourl…
Browse files Browse the repository at this point in the history
…y_airqualitydata_using_bigqdata

Update/calculate hourly airqualitydata using bigqdata
  • Loading branch information
Baalmart authored Feb 25, 2025
2 parents 6408611 + fd64459 commit 3508ebf
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ class Config:
MONGO_URI = os.getenv("MONGO_URI")
MONGO_DATABASE_NAME = os.getenv("MONGO_DATABASE_NAME", "airqo_db")
ENVIRONMENT = os.getenv("ENVIRONMENT")
CALIBRATEBY = os.getenv("CALIBRATEBY")
CALIBRATEBY = os.getenv("CALIBRATEBY", "country")

# Twitter bot
TWITTER_BOT_API_KEY = os.getenv("TWITTER_BOT_API_KEY")
Expand Down
52 changes: 0 additions & 52 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_api import AirQoApi
from .datautils import DataUtils
from airqo_etl_utils.constants import ColumnDataType, Frequency, MetaDataType
from airqo_etl_utils.date import date_to_str
from typing import Any, Dict, List
Expand Down Expand Up @@ -206,57 +205,6 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:

return dataframe[columns]

@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified network, frequency.
Args:
data (pd.DataFrame): The input data to be processed.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.
Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""

data["frequency"] = frequency.str
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

devices, _ = DataUtils.get_devices()

data.rename(columns={"device_id": "device_name"}, inplace=True)
devices.rename(columns={"name": "device_name"}, inplace=True)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
def convert_pressure_values(value):
try:
Expand Down
51 changes: 51 additions & 0 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,3 +953,54 @@ def get_devices_kafka(group_id: str) -> pd.DataFrame:
devices.drop_duplicates(subset=["device_id"], keep="last")

return devices

@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified network, frequency.
Args:
data (pd.DataFrame): The input data to be processed.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.
Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""

data["frequency"] = frequency.str
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

devices, _ = DataUtils.get_devices()

data.rename(columns={"device_id": "device_name"}, inplace=True)
devices.rename(columns={"name": "device_name"}, inplace=True)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data
7 changes: 3 additions & 4 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.datautils import DataUtils
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from airqo_etl_utils.config import configuration as Config
Expand Down Expand Up @@ -37,13 +38,12 @@ def process_data(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -117,13 +117,12 @@ def process_data(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.BAM
)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
Expand Down
6 changes: 3 additions & 3 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def send_hourly_measurements_to_message_broker(
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -435,7 +435,7 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs):
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -482,7 +482,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.LOWCOST
)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/dags/kcca_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.exceptions import AirflowFailException
from airqo_etl_utils.config import configuration as Config
from airqo_etl_utils.constants import DeviceNetwork
from airqo_etl_utils.datautils import DataUtils


@dag(
Expand Down Expand Up @@ -48,12 +49,11 @@ def send_to_api(data: pd.DataFrame):

@task()
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down

0 comments on commit 3508ebf

Please sign in to comment.