Skip to content

Commit

Permalink
Merge pull request #4420 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Feb 11, 2025
2 parents 2919fc0 + f58b6a7 commit dc24e7c
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 22 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-85a0a843-1739265138
tag: prod-2919fc05-1739268252
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-96f29bf1-1739265098
tag: stage-44f5f46e-1739268213
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
29 changes: 12 additions & 17 deletions src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
send_raw_measurements_to_bigquery_doc,
extract_raw_airqo_gaseous_data_doc,
extract_historical_device_measurements_doc,
extract_hourly_old_historical_data_doc,
)
from airqo_etl_utils.constants import DeviceNetwork, DeviceCategory, Frequency, DataType
from datetime import datetime, timedelta
Expand Down Expand Up @@ -648,39 +649,30 @@ def send_raw_measurements_to_bigquery(airqo_data: pd.DataFrame):
)
def airqo_bigquery_data_measurements_to_api():
import pandas as pd
from airflow.models import Variable
from airqo_etl_utils.date import date_to_str_hours

@task(
doc_md=extract_hourly_old_historical_data_doc,
provide_context=True,
retries=3,
retry_delay=timedelta(minutes=5),
)
def extract_hourly_data(**kwargs) -> pd.DataFrame:
from airqo_etl_utils.date import date_to_str_hours

# Only used the first time
start = kwargs.get("params", {}).get("start_date", "2021-01-01T00:00:00Z")
start = datetime.strptime(start, "%Y-%m-%dT%H:%M:%SZ")
start = kwargs.get("params", {}).get("start_date", "2021-01-01T01:00:00Z")
end = kwargs.get("params", {}).get("end_date", "2021-12-31T23:59:59Z")
end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ")

previous_date = kwargs["ti"].xcom_pull(key="new_date")
if not previous_date:
previous_date = start
previous_date = Variable.get("new_date_2021", default_var=start)

hour_of_day = (
datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ")
if not isinstance(previous_date, datetime)
else previous_date
)
hour_of_day = datetime.strptime(previous_date, "%Y-%m-%dT%H:%M:%SZ")
start_date_time = date_to_str_hours(hour_of_day)
end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z")

if hour_of_day > end or (hour_of_day + timedelta(hours=1)) > end:
raise AirflowFailException(f"Run expired on {end}")

if previous_date == start:
kwargs["ti"].xcom_push(key="new_date", value=hour_of_day)

return DataUtils.extract_data_from_bigquery(
DataType.AVERAGED,
start_date_time=start_date_time,
Expand All @@ -697,8 +689,11 @@ def send_hourly_measurements_to_api(data: pd.DataFrame, **kwargs):

airqo_api = AirQoApi()
airqo_api.save_events(measurements=data)
previous_date = kwargs["ti"].xcom_pull(key="new_date")
kwargs["ti"].xcom_push(key="new_date", value=previous_date + timedelta(hours=1))
previous_date = datetime.strptime(
Variable.get("new_date_2021"), "%Y-%m-%dT%H:%M:%SZ"
)
previous_date = date_to_str_hours(previous_date + timedelta(hours=1))
Variable.set("new_date_2021", previous_date)

hourly_data = extract_hourly_data()
send_hourly_measurements_to_api(hourly_data)
Expand Down
11 changes: 11 additions & 0 deletions src/workflows/dags/task_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,14 @@
#### Notes
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

extract_hourly_old_historical_data_doc = """
#### Purpose
Extracts hourly averaged data from BigQuery for a specified time window.
- This task function retrieves the start and end date-times from the Airflow parameters, then uses an Airflow Variable to obtain the last processed timestamp (defaulting to the start date if not set).
- It computes the current hourly window by converting the retrieved timestamp into a start and end string, with the end time representing the last second of the hour. If the computed hour exceeds the specified end date, an AirflowFailException is raised to halt further processing.
#### Notes
- <a href="https://airqo.africa/" target="_blank">AirQo</a>
"""

0 comments on commit dc24e7c

Please sign in to comment.