Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/calculate hourly airqualitydata using bigqdata #4484

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,7 @@ class Config:
MONGO_URI = os.getenv("MONGO_URI")
MONGO_DATABASE_NAME = os.getenv("MONGO_DATABASE_NAME", "airqo_db")
ENVIRONMENT = os.getenv("ENVIRONMENT")
CALIBRATEBY = os.getenv("CALIBRATEBY")
CALIBRATEBY = os.getenv("CALIBRATEBY", "country")

# Twitter bot
TWITTER_BOT_API_KEY = os.getenv("TWITTER_BOT_API_KEY")
Expand Down
52 changes: 0 additions & 52 deletions src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.airqo_api import AirQoApi
from .datautils import DataUtils
from airqo_etl_utils.constants import ColumnDataType, Frequency, MetaDataType
from airqo_etl_utils.date import date_to_str
from typing import Any, Dict, List
Expand Down Expand Up @@ -206,57 +205,6 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:

return dataframe[columns]

@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified network, frequency.

Args:
data (pd.DataFrame): The input data to be processed.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.

Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""

data["frequency"] = frequency.str
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

devices, _ = DataUtils.get_devices()

data.rename(columns={"device_id": "device_name"}, inplace=True)
devices.rename(columns={"name": "device_name"}, inplace=True)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data

@staticmethod
def convert_pressure_values(value):
try:
Expand Down
51 changes: 51 additions & 0 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -953,3 +953,54 @@ def get_devices_kafka(group_id: str) -> pd.DataFrame:
devices.drop_duplicates(subset=["device_id"], keep="last")

return devices

@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Processes the input DataFrame for message broker consumption based on the specified network, frequency.

Args:
data (pd.DataFrame): The input data to be processed.
frequency (Frequency): The data frequency (e.g., hourly), defaults to Frequency.HOURLY.

Returns:
pd.DataFrame: The processed DataFrame ready for message broker consumption.
"""

data["frequency"] = frequency.str
data["timestamp"] = pd.to_datetime(data["timestamp"])
data["timestamp"] = data["timestamp"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

devices, _ = DataUtils.get_devices()

data.rename(columns={"device_id": "device_name"}, inplace=True)
devices.rename(columns={"name": "device_name"}, inplace=True)
try:
devices = devices[
[
"device_name",
"site_id",
"device_latitude",
"device_longitude",
"network",
]
]

data = pd.merge(
left=data,
right=devices,
on=["device_name", "site_id", "network"],
how="left",
)
except KeyError as e:
logger.exception(
f"KeyError: The key(s) '{e.args}' are not available in the returned devices data."
)
return None
except Exception as e:
logger.exception(f"An error occured: {e}")
return None
return data
7 changes: 3 additions & 4 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from airqo_etl_utils.date import DateUtils
from airqo_etl_utils.datautils import DataUtils
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
from datetime import timedelta
from airqo_etl_utils.config import configuration as Config
Expand Down Expand Up @@ -37,13 +38,12 @@ def process_data(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -117,13 +117,12 @@ def process_data(data: pd.DataFrame):
@task(retries=3, retry_delay=timedelta(minutes=5))
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils
from airqo_etl_utils.data_validator import DataValidationUtils
from datetime import datetime

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.BAM
)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
Expand Down
6 changes: 3 additions & 3 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def send_hourly_measurements_to_message_broker(
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -435,7 +435,7 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs):
now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)

data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down Expand Up @@ -482,7 +482,7 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
data = AirQoDataUtils.process_latest_data(
data=data, device_category=DeviceCategory.LOWCOST
)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/dags/kcca_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from airflow.exceptions import AirflowFailException
from airqo_etl_utils.config import configuration as Config
from airqo_etl_utils.constants import DeviceNetwork
from airqo_etl_utils.datautils import DataUtils


@dag(
Expand Down Expand Up @@ -48,12 +49,11 @@ def send_to_api(data: pd.DataFrame):

@task()
def send_to_message_broker(data: pd.DataFrame, **kwargs):
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.message_broker_utils import MessageBrokerUtils

now = datetime.now()
unique_str = str(now.date()) + "-" + str(now.hour) + "-" + str(now.second)
data = DataValidationUtils.process_data_for_message_broker(
data = DataUtils.process_data_for_message_broker(
data=data,
caller=kwargs["dag"].dag_id + unique_str,
topic=Config.HOURLY_MEASUREMENTS_TOPIC,
Expand Down
Loading