From 060e8739944982293aa63f1ba5c4eb77ecc882fd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 04:54:34 +0300 Subject: [PATCH 1/6] Update device registry staging image tag to stage-627aaca8-1739238787 --- k8s/device-registry/values-stage.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-stage.yaml b/k8s/device-registry/values-stage.yaml index ff0d641302..28a6f6362f 100644 --- a/k8s/device-registry/values-stage.yaml +++ b/k8s/device-registry/values-stage.yaml @@ -6,7 +6,7 @@ app: replicaCount: 2 image: repository: eu.gcr.io/airqo-250220/airqo-stage-device-registry-api - tag: stage-83e69174-1738960327 + tag: stage-627aaca8-1739238787 nameOverride: '' fullnameOverride: '' podAnnotations: {} From d6a5d19267f790a350c92c4f0886e1a85b82e464 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 04:55:24 +0300 Subject: [PATCH 2/6] Update auth service production image tag to prod-ae87cecf-1739238823 --- k8s/auth-service/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/auth-service/values-prod.yaml b/k8s/auth-service/values-prod.yaml index a316449ecf..8fdc983dfd 100644 --- a/k8s/auth-service/values-prod.yaml +++ b/k8s/auth-service/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-auth-api - tag: prod-fbd4c147-1739233652 + tag: prod-ae87cecf-1739238823 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 82cd5bee1994ede7e6d02835787b84682ea59176 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 04:55:40 +0300 Subject: [PATCH 3/6] Update device registry production image tag to prod-ae87cecf-1739238823 --- k8s/device-registry/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/device-registry/values-prod.yaml b/k8s/device-registry/values-prod.yaml index 06647c2463..af11f31361 100644 --- a/k8s/device-registry/values-prod.yaml +++ b/k8s/device-registry/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-device-registry-api - tag: prod-e7170088-1739227245 + tag: prod-ae87cecf-1739238823 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 18c5cc0b75d18e53ef2e4fc62f7effafec1dd07b Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 04:56:25 +0300 Subject: [PATCH 4/6] Update workflows prod image tag to prod-ae87cecf-1739238823 --- k8s/workflows/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/workflows/values-prod.yaml b/k8s/workflows/values-prod.yaml index 61d396fa23..f34b9514af 100644 --- a/k8s/workflows/values-prod.yaml +++ b/k8s/workflows/values-prod.yaml @@ -10,7 +10,7 @@ images: initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom redisContainer: eu.gcr.io/airqo-250220/airqo-redis containers: eu.gcr.io/airqo-250220/airqo-workflows - tag: prod-fbd4c147-1739233652 + tag: prod-ae87cecf-1739238823 nameOverride: '' fullnameOverride: '' podAnnotations: {} From bcd0628c60f4b9958f17b71eb1d595859ae64556 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 11 Feb 2025 04:56:27 +0300 Subject: [PATCH 5/6] Update spatial production image tag to prod-ae87cecf-1739238823 --- k8s/spatial/values-prod.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/k8s/spatial/values-prod.yaml b/k8s/spatial/values-prod.yaml index df8203fe9a..23bc14fdd3 100644 --- a/k8s/spatial/values-prod.yaml +++ b/k8s/spatial/values-prod.yaml @@ -6,7 +6,7 @@ app: replicaCount: 3 image: repository: eu.gcr.io/airqo-250220/airqo-spatial-api - tag: prod-fbd4c147-1739233652 + tag: prod-ae87cecf-1739238823 nameOverride: '' fullnameOverride: '' podAnnotations: {} From 980489304de8a83fe2b5d3b3420069d3b43d8e34 Mon Sep 17 00:00:00 2001 From: NicholasTurner23 Date: Tue, 11 Feb 2025 09:55:17 +0300 Subject: [PATCH 6/6] Clean up datetime implementation and sanitize csv data --- src/workflows/airqo_etl_utils/airqo_utils.py | 4 ++-- src/workflows/airqo_etl_utils/datautils.py | 2 +- .../airqo_etl_utils/meta_data_utils.py | 3 ++- .../airqo_etl_utils/weather_data_utils.py | 3 ++- src/workflows/dags/airqo_measurements.py | 21 ++++++++++++------- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/workflows/airqo_etl_utils/airqo_utils.py b/src/workflows/airqo_etl_utils/airqo_utils.py index b6c895b362..4ec7009bb5 100644 --- a/src/workflows/airqo_etl_utils/airqo_utils.py +++ b/src/workflows/airqo_etl_utils/airqo_utils.py @@ -14,6 +14,7 @@ from .datautils import DataUtils from .weather_data_utils import WeatherDataUtils from typing import List, Dict, Any, Union +import ast import logging @@ -392,8 +393,7 @@ def merge_aggregated_weather_data( "distance": station.get("distance", None), } for _, site in sites.iterrows() - for station in site.get("weather_stations", []) - if isinstance(station, dict) + for station in ast.literal_eval(site.get("weather_stations", [])) ] sites_df = pd.DataFrame(sites_info) diff --git a/src/workflows/airqo_etl_utils/datautils.py b/src/workflows/airqo_etl_utils/datautils.py index 4183b1144a..39356c22ea 100644 --- a/src/workflows/airqo_etl_utils/datautils.py +++ b/src/workflows/airqo_etl_utils/datautils.py @@ -78,7 +78,7 @@ def get_devices( return devices, keys @staticmethod - def get_sites(network: DeviceNetwork = None) -> pd.DataFrame: + def get_sites(network: Optional[DeviceNetwork] = None) -> pd.DataFrame: """ Retrieve sites data. diff --git a/src/workflows/airqo_etl_utils/meta_data_utils.py b/src/workflows/airqo_etl_utils/meta_data_utils.py index 4915fce663..72a496ab92 100644 --- a/src/workflows/airqo_etl_utils/meta_data_utils.py +++ b/src/workflows/airqo_etl_utils/meta_data_utils.py @@ -8,6 +8,7 @@ from .weather_data_utils import WeatherDataUtils from datetime import datetime, timezone from typing import Optional +import ast class MetaDataUtils: @@ -271,7 +272,7 @@ def update_nearest_weather_stations( { "site_id": site.get("site_id"), "network": site.get("network"), - "weather_stations": site.get("weather_stations"), + "weather_stations": ast.literal_eval(site.get("weather_stations")), } for site in updated_sites ] diff --git a/src/workflows/airqo_etl_utils/weather_data_utils.py b/src/workflows/airqo_etl_utils/weather_data_utils.py index 7bd96819b8..7a66b77de0 100644 --- a/src/workflows/airqo_etl_utils/weather_data_utils.py +++ b/src/workflows/airqo_etl_utils/weather_data_utils.py @@ -11,6 +11,7 @@ from .openweather_api import OpenWeatherApi from .tahmo_api import TahmoApi from .utils import Utils +import ast from typing import List, Dict, Optional @@ -141,7 +142,7 @@ def query_raw_data_from_tahmo( sites = DataUtils.get_sites() station_codes = [] for _, site in sites.iterrows(): - weather_stations = site.get("weather_stations", []) + weather_stations = ast.literal_eval(site.get("weather_stations", [])) station_codes.extend( weather_station.get("code", "") for weather_station in weather_stations diff --git a/src/workflows/dags/airqo_measurements.py b/src/workflows/dags/airqo_measurements.py index f452ca25b1..7c895c8ac2 100644 --- a/src/workflows/dags/airqo_measurements.py +++ b/src/workflows/dags/airqo_measurements.py @@ -658,24 +658,29 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame: from airqo_etl_utils.date import date_to_str_hours # Only used the first time - start = kwargs.get("params", {}).get("start_date", "2021-01-01") - end_d = kwargs.get("params", {}).get("end_date", "2021-12-31") - end_d = datetime.strptime(end_d, "%Y-%m-%d") - end_dt = end_d.replace(hour=23, minute=59, second=59) - end = datetime.strftime(end_dt, "%Y-%m-%dT%H:%M:%SZ") + start = kwargs.get("params", {}).get("start_date", "2021-01-01T00:00:00Z") + start = datetime.strptime(start, "%%Y-%m-%dT%H:%M:%SZ") + end = kwargs.get("params", {}).get("end_date", "2021-12-31T23:59:59Z") + end = datetime.strptime(end, "%Y-%m-%dT%H:%M:%SZ") previous_date = kwargs["ti"].xcom_pull(key="new_date") if not previous_date: previous_date = start - hour_of_day = previous_date + timedelta(hours=1) - - start_date_time = date_to_str_hours(previous_date) + hour_of_day = ( + datetime.strptime(previous_date, "%%Y-%m-%dT%H:%M:%SZ") + if not isinstance(previous_date, datetime) + else previous_date + ) + start_date_time = date_to_str_hours(hour_of_day) end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z") if start_date_time > end or end_date_time > end: raise AirflowFailException(f"Run expired on {end}") + if previous_date == start: + kwargs["ti"].xcom_push(key="new_date", value=hour_of_day) + return DataUtils.extract_data_from_bigquery( DataType.AVERAGED, start_date_time=start_date_time,