Skip to content

Commit

Permalink
Merge pull request #4050 from NicholasTurner23/update-fix/Clean_up
Browse files Browse the repository at this point in the history
Update fix/clean up
  • Loading branch information
Baalmart authored Dec 12, 2024
2 parents 83a69df + 93fcf00 commit c43059f
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
40 changes: 31 additions & 9 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,41 @@ def validate_data(
float_columns=None,
integer_columns=None,
) -> pd.DataFrame:
"""
Validates and formats the data in the given DataFrame based on the schema of a specified table.
This function performs the following tasks:
1. Ensures the DataFrame contains the required columns as defined in the schema of the `table`.
2. Formats column data types (e.g., timestamp, float, integer) based on the table schema or provided arguments.
3. Removes duplicate rows, keeping the first occurrence.
Args:
self: Class instance, required for accessing schema-related methods.
dataframe (pd.DataFrame): The DataFrame to validate and format.
table (str): The name of the table whose schema is used for validation.
raise_exception (bool, optional): Whether to raise an exception if required columns are missing. Defaults to True.
date_time_columns (list, optional): List of columns to be formatted as datetime. If None, inferred from the schema.
float_columns (list, optional): List of columns to be formatted as float. If None, inferred from the schema.
integer_columns (list, optional): List of columns to be formatted as integer. If None, inferred from the schema.
Returns:
pd.DataFrame: A validated and formatted DataFrame with duplicates removed.
Raises:
Exception: If required columns are missing and `raise_exception` is set to True.
"""
valid_cols = self.get_columns(table=table)
dataframe_cols = dataframe.columns.to_list()

if set(valid_cols).issubset(set(dataframe_cols)):
dataframe = dataframe[valid_cols]
else:
print(f"Required columns {valid_cols}")
print(f"Dataframe columns {dataframe_cols}")
print(
f"Difference between required and received {list(set(valid_cols) - set(dataframe_cols))}"
)
missing_cols = list(set(valid_cols) - set(dataframe_cols))
logger.warning(f"Required columns {valid_cols}")
logger.warning(f"Dataframe columns {dataframe_cols}")
logger.warning(f"Missing columns {missing_cols}")
if raise_exception:
raise Exception("Invalid columns")
raise Exception(f"Invalid columns {missing_cols}")

date_time_columns = (
date_time_columns
Expand Down Expand Up @@ -245,7 +267,7 @@ def load_data(
def add_unique_id(dataframe: pd.DataFrame, id_column="unique_id") -> pd.DataFrame:
dataframe[id_column] = dataframe.apply(
lambda row: BigQueryApi.device_unique_col(
tenant=row["tenant"],
network=row["network"],
device_number=row["device_number"],
device_id=row["device_id"],
),
Expand All @@ -254,8 +276,8 @@ def add_unique_id(dataframe: pd.DataFrame, id_column="unique_id") -> pd.DataFram
return dataframe

@staticmethod
def device_unique_col(tenant: str, device_id: str, device_number: int):
return str(f"{tenant}:{device_id}:{device_number}").lower()
def device_unique_col(network: str, device_id: str, device_number: int):
return str(f"{network}:{device_id}:{device_number}").lower()

def update_airqlouds(self, dataframe: pd.DataFrame, table=None) -> None:
if table is None:
Expand Down
5 changes: 1 addition & 4 deletions src/workflows/airqo_etl_utils/meta_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def extract_devices_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
dataframe = pd.json_normalize(devices)
dataframe = dataframe[
[
"tenant",
"network",
"latitude",
"longitude",
Expand All @@ -25,8 +24,6 @@ def extract_devices_from_api(tenant: Tenant = Tenant.ALL) -> pd.DataFrame:
"description",
"device_manufacturer",
"device_category",
"approximate_latitude",
"approximate_longitude",
]
]

Expand Down Expand Up @@ -124,7 +121,7 @@ def extract_sites_from_api(network: str = "all") -> pd.DataFrame:
dataframe = pd.json_normalize(sites)
dataframe = dataframe[
[
"tenant",
"network",
"site_id",
"latitude",
"longitude",
Expand Down
5 changes: 5 additions & 0 deletions src/workflows/airqo_etl_utils/schema/devices.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,10 @@
"name": "device_category",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "last_updated",
"type": "TIMESTAMP",
"mode": "NULLABLE"
}
]
12 changes: 11 additions & 1 deletion src/workflows/airqo_etl_utils/schema/sites.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "network",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "id",
"type": "STRING",
Expand Down Expand Up @@ -68,5 +73,10 @@
"name": "city",
"type": "STRING",
"mode": "NULLABLE"
},
{
"name": "last_updated",
"type": "DATE",
"mode": "NULLABLE"
}
]
]

0 comments on commit c43059f

Please sign in to comment.