From 68e5f3291cff5cd0d840ffc6cc5ba66d775df6c2 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Thu, 27 Feb 2025 15:51:43 +0300 Subject: [PATCH] Send recalibrated data to events api --- src/workflows/dags/airqo_measurements.py | 31 ++++++++++++++++++------ 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index 99059605b9..8ae63ace51 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -1,6 +1,6 @@ from airflow.decorators import dag, task import pandas as pd -from typing import Tuple +from typing import Tuple, Generator from airqo_etl_utils.config import configuration as Config from airqo_etl_utils.workflows_custom_utils import AirflowUtils @@ -700,15 +700,17 @@ def extract_devices_missing_calibrated_data(**kwargs) -> Tuple[pd.DataFrame, str return devices @task(retries=3, retry_delay=timedelta(minutes=5)) - def extract_calibrate_send_hourly_measurements_to_bigquery( - devices: pd.DataFrame, + def extract_calibrate_data(devices: pd.DataFrame): + return AirQoDataUtils.extract_aggregate_calibrate_raw_data(devices) + + @task(retries=3, retry_delay=timedelta(minutes=5)) + def send_hourly_measurements_to_bigquery( + calibrated_data: Generator, ) -> None: big_query_api = BigQueryApi() - for calibrated_data in AirQoDataUtils.extract_aggregate_calibrate_raw_data( - devices - ): + for data_ in calibrated_data: data = DataUtils.format_data_for_bigquery( - calibrated_data, + data_, DataType.AVERAGED, DeviceCategory.GENERAL, Frequency.HOURLY, @@ -719,8 +721,21 @@ def extract_calibrate_send_hourly_measurements_to_bigquery( where_fields={"device_id": data.iloc[0].device_id}, ) + @task(retries=3, retry_delay=timedelta(minutes=5)) + def send_hourly_measurements_to_api(calibrated_data: Generator) -> None: + from airqo_etl_utils.airqo_api import AirQoApi + + data = DataUtils.process_data_for_api( + calibrated_data, frequency=Frequency.HOURLY + ) + + airqo_api = AirQoApi() + airqo_api.save_events(measurements=data) + devices = extract_devices_missing_calibrated_data() - extract_calibrate_send_hourly_measurements_to_bigquery(devices) + calibrated_data = extract_calibrate_data(devices) + send_hourly_measurements_to_bigquery(calibrated_data) + send_hourly_measurements_to_api(calibrated_data) airqo_historical_hourly_measurements()