Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move to production #4420

Merged
merged 9 commits into from
Feb 11, 2025
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-96f29bf1-1739265098
tag: stage-44f5f46e-1739268213
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
29 changes: 12 additions & 17 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
send_raw_measurements_to_bigquery_doc,
extract_raw_airqo_gaseous_data_doc,
extract_historical_device_measurements_doc,
extract_hourly_old_historical_data_doc,
)
from airqo_etl_utils.constants import DeviceNetwork, DeviceCategory, Frequency, DataType
from datetime import datetime, timedelta
Expand Down Expand Up @@ -648,39 +649,30 @@ def send_raw_measurements_to_bigquery(airqo_data: pd.DataFrame):
)
def airqo_bigquery_data_measurements_to_api():
import pandas as pd
from airflow.models import Variable
from airqo_etl_utils.date import date_to_str_hours

@task(
doc_md=extract_hourly_old_historical_data_doc,
provide_context=True,
retries=3,
retry_delay=timedelta(minutes=5),
)
def extract_hourly_data(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.date import date_to_str_hours

# Only used the first time
start = kwargs.get("params", {}).get("start_date", "2021-01-01T00:00:00Z")
start = datetime.strptime(start, "%Y-%m-%dT%H:%M:%SZ")
start = kwargs.get("params", {}).get("start_date", "2021-01-01T01:00:00Z")
end = kwargs.get("params", {}).get("end_date", "2021-12-31T23:59:59Z")
end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ")

previous_date = kwargs["ti"].xcom_pull(key="new_date")
if not previous_date:
previous_date = start
previous_date = Variable.get("new_date_2021", default_var=start)

hour_of_day = (
datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ")
if not isinstance(previous_date, datetime)
else previous_date
)
hour_of_day = datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ")
start_date_time = date_to_str_hours(hour_of_day)
end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z")

if hour_of_day > end or (hour_of_day + timedelta(hours=1)) > end:
raise AirflowFailException(f"Run expired on {end}")

if previous_date == start:
kwargs["ti"].xcom_push(key="new_date", value=hour_of_day)

return DataUtils.extract_data_from_bigquery(
DataType.AVERAGED,
start_date_time=start_date_time,
Expand All @@ -697,8 +689,11 @@ def send_hourly_measurements_to_api(data: pd.DataFrame, **kwargs):

airqo_api = AirQoApi()
airqo_api.save_events(measurements=data)
previous_date = kwargs["ti"].xcom_pull(key="new_date")
kwargs["ti"].xcom_push(key="new_date", value=previous_date + timedelta(hours=1))
previous_date = datetime.strptime(
Variable.get("new_date_2021"), "%Y-%m-%dT%H:%M:%SZ"
)
previous_date = date_to_str_hours(previous_date + timedelta(hours=1))
Variable.set("new_date_2021", previous_date)

hourly_data = extract_hourly_data()
send_hourly_measurements_to_api(hourly_data)
Expand Down
11 changes: 11 additions & 0 deletions src/workflows/dags/task_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,14 @@
#### Notes
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

extract_hourly_old_historical_data_doc = """
#### Purpose
Extracts hourly averaged data from BigQuery for a specified time window.

- This task function retrieves the start and end date-times from the Airflow parameters, then uses an Airflow Variable to obtain the last processed timestamp (defaulting to the start date if not set).
- It computes the current hourly window by converting the retrieved timestamp into a start and end string, with the end time representing the last second of the hour. If the computed hour exceeds the specified end date, an AirflowFailException is raised to halt further processing.

#### Notes
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""
Loading