Skip to content

Commit

Permalink
Updates/cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasTurner23 committed Dec 11, 2024
1 parent b82fe30 commit 0183031
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/workflows/airqo_etl_utils/data_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def process_for_big_query(dataframe: pd.DataFrame, table: str) -> pd.DataFrame:
@staticmethod
def process_data_for_message_broker(
data: pd.DataFrame,
topic: str,
caller: str,
topic: str = None,
frequency: Frequency = Frequency.HOURLY,
) -> pd.DataFrame:
"""
Expand Down
4 changes: 2 additions & 2 deletions src/workflows/dags/airnow.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)
if not data:
raise AirflowFailException(
Expand Down Expand Up @@ -136,8 +136,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_bam_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down
6 changes: 3 additions & 3 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ def send_hourly_measurements_to_message_broker(

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down Expand Up @@ -402,8 +402,8 @@ def send_hourly_measurements_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down Expand Up @@ -453,8 +453,8 @@ def update_latest_data_topic(data: pd.DataFrame, **kwargs):
)
data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id + unique_str,
topic=configuration.AVERAGED_HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/kcca_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def send_to_message_broker(data: pd.DataFrame, **kwargs):

data = DataValidationUtils.process_data_for_message_broker(
data=data,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
caller=kwargs["dag"].dag_id,
topic=configuration.HOURLY_MEASUREMENTS_TOPIC,
)

if not data:
Expand Down

0 comments on commit 0183031

Please sign in to comment.