Skip to content

Commit

Permalink
Send recalibrated data to events api
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasTurner23 committed Feb 27, 2025
1 parent 7da68b6 commit 68e5f32
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from airflow.decorators import dag, task
import pandas as pd
from typing import Tuple
from typing import Tuple, Generator

from airqo_etl_utils.config import configuration as Config
from airqo_etl_utils.workflows_custom_utils import AirflowUtils
Expand Down Expand Up @@ -700,15 +700,17 @@ def extract_devices_missing_calibrated_data(**kwargs) -> Tuple[pd.DataFrame, str
return devices

@task(retries=3, retry_delay=timedelta(minutes=5))
def extract_calibrate_send_hourly_measurements_to_bigquery(
devices: pd.DataFrame,
def extract_calibrate_data(devices: pd.DataFrame):
return AirQoDataUtils.extract_aggregate_calibrate_raw_data(devices)

@task(retries=3, retry_delay=timedelta(minutes=5))
def send_hourly_measurements_to_bigquery(
calibrated_data: Generator,
) -> None:
big_query_api = BigQueryApi()
for calibrated_data in AirQoDataUtils.extract_aggregate_calibrate_raw_data(
devices
):
for data_ in calibrated_data:
data = DataUtils.format_data_for_bigquery(
calibrated_data,
data_,
DataType.AVERAGED,
DeviceCategory.GENERAL,
Frequency.HOURLY,
Expand All @@ -719,8 +721,21 @@ def extract_calibrate_send_hourly_measurements_to_bigquery(
where_fields={"device_id": data.iloc[0].device_id},
)

@task(retries=3, retry_delay=timedelta(minutes=5))
def send_hourly_measurements_to_api(calibrated_data: Generator) -> None:
from airqo_etl_utils.airqo_api import AirQoApi

data = DataUtils.process_data_for_api(
calibrated_data, frequency=Frequency.HOURLY
)

airqo_api = AirQoApi()
airqo_api.save_events(measurements=data)

devices = extract_devices_missing_calibrated_data()
extract_calibrate_send_hourly_measurements_to_bigquery(devices)
calibrated_data = extract_calibrate_data(devices)
send_hourly_measurements_to_bigquery(calibrated_data)
send_hourly_measurements_to_api(calibrated_data)


airqo_historical_hourly_measurements()
Expand Down

0 comments on commit 68e5f32

Please sign in to comment.