Skip to content

Commit

Permalink
Merge pull request #4415 from airqo-platform/staging
Browse files Browse the repository at this point in the history
move to production
  • Loading branch information
Baalmart authored Feb 11, 2025
2 parents 855515c + 96f29bf commit 85a0a84
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 21 deletions.
2 changes: 1 addition & 1 deletion k8s/auth-service/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-auth-api
tag: prod-dd79c919-1739257284
tag: prod-855515c6-1739261225
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/device-registry/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-device-registry-api
tag: prod-dd79c919-1739257284
tag: prod-855515c6-1739261225
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/spatial/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ app:
replicaCount: 3
image:
repository: eu.gcr.io/airqo-250220/airqo-spatial-api
tag: prod-dd79c919-1739257284
tag: prod-855515c6-1739261225
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-redis
containers: eu.gcr.io/airqo-250220/airqo-workflows
tag: prod-dd79c919-1739257284
tag: prod-855515c6-1739261225
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
2 changes: 1 addition & 1 deletion k8s/workflows/values-stage.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ images:
initContainer: eu.gcr.io/airqo-250220/airqo-stage-workflows-xcom
redisContainer: eu.gcr.io/airqo-250220/airqo-stage-redis
containers: eu.gcr.io/airqo-250220/airqo-stage-workflows
tag: stage-f00a37c2-1739257230
tag: stage-39f00e8a-1739261191
nameOverride: ''
fullnameOverride: ''
podAnnotations: {}
Expand Down
58 changes: 49 additions & 9 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from airqo_etl_utils.bigquery_api import BigQueryApi
from airqo_etl_utils.data_validator import DataValidationUtils
from airqo_etl_utils.constants import DataType, Frequency, DeviceCategory
from airqo_etl_utils.config import configuration as Config
from typing import Optional


class DailyDataUtils:
Expand Down Expand Up @@ -53,33 +56,70 @@ def average_data(data: pd.DataFrame) -> pd.DataFrame:

@staticmethod
def cleanup_and_reload(
data: pd.DataFrame, start_date_time=None, end_date_time=None
):
data: pd.DataFrame,
start_date_time: Optional[str] = None,
end_date_time: Optional[str] = None,
) -> None:
"""
Cleans up the input dataset by removing duplicates and reloads the processed data into BigQuery.
This function:
- Converts the "timestamp" column to a datetime format.
- Removes duplicate entries based on "device_number", "device_id", and "timestamp".
- Validates and processes the data to match BigQuery requirements.
- Reloads the cleaned dataset into the BigQuery daily measurements table.
Args:
data(pd.DataFrame): The input dataset containing device measurements.
start_date_time(str, optional): The start timestamp for data reloading (ISO format).
end_date_time(str, optional): The end timestamp for data reloading (ISO format).
Returns:
None
"""
data["timestamp"] = data["timestamp"].apply(pd.to_datetime)
data = data.drop_duplicates(
subset=["device_number", "device_id", "timestamp"], keep="first"
data.drop_duplicates(
subset=["device_number", "device_id", "timestamp"],
keep="first",
inplace=True,
)

bigquery_api = BigQueryApi()
table = bigquery_api.daily_measurements_table

source = Config.DataSource.get(DataType.AVERAGED)
table = source.get(DeviceCategory.GENERAL).get(Frequency.DAILY)

data = DataValidationUtils.process_for_big_query(
dataframe=data,
table=table,
)
bigquery_api.reload_data(
network="all",
table=table,
dataframe=data,
start_date_time=start_date_time,
end_date_time=end_date_time,
)

@staticmethod
def save_data(data: pd.DataFrame):
bigquery_api = BigQueryApi()
def save_data(data: pd.DataFrame) -> None:
"""
Processes and saves the given dataset to BigQuery.
This function:
- Retrieves the BigQuery daily measurements table.
- Validates and processes the dataset to ensure it meets BigQuery requirements.
- Loads the processed data into the designated BigQuery table.
Args:
data (pd.DataFrame): The dataset containing measurement data to be stored.
table = bigquery_api.daily_measurements_table
Returns:
None
"""
bigquery_api = BigQueryApi()

source = Config.DataSource.get(DataType.AVERAGED)
table = source.get(DeviceCategory.GENERAL).get(Frequency.DAILY)
data = DataValidationUtils.process_for_big_query(
dataframe=data,
table=table,
Expand Down
12 changes: 6 additions & 6 deletions src/workflows/airqo_etl_utils/datautils.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,22 +308,22 @@ def extract_data_from_bigquery(
end_date_time: str,
frequency: Frequency,
device_category: DeviceCategory,
device_network: DeviceNetwork = None,
dynamic_query: bool = False,
remove_outliers: bool = True,
device_network: Optional[DeviceNetwork] = None,
dynamic_query: Optional[bool] = False,
remove_outliers: Optional[bool] = True,
) -> pd.DataFrame:
"""
Extracts data from BigQuery within a specified time range and frequency,
with an optional filter for the device network. The data is cleaned to remove outliers.
Args:
datatype(str): The type of data to extract determined by the source data asset.
datatype(DataType): The type of data to extract determined by the source data asset.
start_date_time(str): The start of the time range for data extraction, in ISO 8601 format.
end_date_time(str): The end of the time range for data extraction, in ISO 8601 format.
frequency(Frequency): The frequency of the data to be extracted, e.g., RAW or HOURLY.
device_network(DeviceNetwork, optional): The network to filter devices, default is None (no filter).
dynamic_query (bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False.
remove_outliers (bool, optional): If True, removes outliers from the extracted data. Defaults to True.
dynamic_query(bool, optional): Determines the type of data returned. If True, returns averaged data grouped by `device_number`, `device_id`, and `site_id`. If False, returns raw data without aggregation. Defaults to False.
remove_outliers(bool, optional): If True, removes outliers from the extracted data. Defaults to True.
Returns:
pd.DataFrame: A pandas DataFrame containing the cleaned data from BigQuery.
Expand Down
2 changes: 1 addition & 1 deletion src/workflows/dags/airqo_measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame:
start_date_time = date_to_str_hours(hour_of_day)
end_date_time = datetime.strftime(hour_of_day, "%Y-%m-%dT%H:59:59Z")

if start_date_time > end or end_date_time > end:
if hour_of_day > end or (hour_of_day + timedelta(hours=1)) > end:
raise AirflowFailException(f"Run expired on {end}")

if previous_date == start:
Expand Down

0 comments on commit 85a0a84

Please sign in to comment.