@@ -439,14 +439,11 @@ def download_from_bigquery(
439
439
weather_columns = []
440
440
for pollutant in pollutants :
441
441
442
- if pollutant == "raw" :
443
- key = pollutant
444
- else :
445
- key = f"{ pollutant } _{ data_type } "
446
-
442
+ key = f"{ pollutant } _{ data_type } "
447
443
pollutant_mapping = BIGQUERY_FREQUENCY_MAPPER .get (frequency , {}).get (
448
444
key , []
449
445
)
446
+
450
447
pollutant_columns .extend (
451
448
cls .get_columns (
452
449
cls ,
@@ -460,6 +457,10 @@ def download_from_bigquery(
460
457
461
458
# TODO Clean up by use using `get_columns` helper method
462
459
if pollutant in {"pm2_5" , "pm10" , "no2" }:
460
+ if data_type == "raw" :
461
+ # Add dummy column to fix union column number missmatch.
462
+ bam_pollutant_columns .append ("-1 as pm2_5" )
463
+
463
464
if frequency in ["weekly" , "monthly" , "yearly" ]:
464
465
bam_pollutant_columns .extend (
465
466
[f"ROUND(AVG({ pollutant } ), { decimal_places } ) AS { key } _value" ]
@@ -468,6 +469,7 @@ def download_from_bigquery(
468
469
bam_pollutant_columns .extend (
469
470
[f"ROUND({ pollutant } , { decimal_places } ) AS { key } _value" ]
470
471
)
472
+
471
473
# TODO Fix query when weather data is included. Currently failing
472
474
if weather_fields :
473
475
for field in weather_fields :
@@ -537,12 +539,55 @@ def download_from_bigquery(
537
539
drop_columns .append ("datetime" )
538
540
sorting_cols .append ("datetime" )
539
541
542
+ if data_type == "raw" :
543
+ cls .simple_data_cleaning (dataframe )
544
+
540
545
dataframe .drop_duplicates (subset = drop_columns , inplace = True , keep = "first" )
541
546
dataframe .sort_values (sorting_cols , ascending = True , inplace = True )
542
547
dataframe ["frequency" ] = frequency
543
548
dataframe = dataframe .replace (np .nan , None )
544
549
return dataframe
545
550
551
+ @classmethod
552
+ def simple_data_cleaning (cls , data : pd .DataFrame ) -> pd .DataFrame :
553
+ """
554
+ Perform data cleaning on a pandas DataFrame to handle specific conditions
555
+ related to "pm2_5" and "pm2_5_raw_value" columns.
556
+
557
+ The cleaning process includes:
558
+ 1. Ensuring correct numeric data types for "pm2_5" and "pm2_5_raw_value".
559
+ 2. Removing "pm2_5" values where "pm2_5_raw_value" has data.
560
+ 3. Dropping the "pm2_5_raw_value" column if it has no data at all.
561
+ 4. Retaining "pm2_5" values where "pm2_5_raw_value" has no data, and removing
562
+ "pm2_5" values where "pm2_5_raw_value" has data.
563
+ 5. Dropping any column (including "pm2_5" and "pm2_5_raw_value") if it is
564
+ entirely empty.
565
+
566
+ Args:
567
+ cls: Class reference (used in classmethods).
568
+ data (pd.DataFrame): Input pandas DataFrame with "pm2_5" and
569
+ "pm2_5_raw_value" columns.
570
+
571
+ Returns:
572
+ pd.DataFrame: Cleaned DataFrame with updates applied in place.
573
+
574
+ """
575
+ data ["pm2_5_raw_value" ] = pd .to_numeric (
576
+ data ["pm2_5_raw_value" ], errors = "coerce"
577
+ )
578
+ data ["pm2_5" ] = pd .to_numeric (data ["pm2_5" ], errors = "coerce" )
579
+
580
+ data .loc [~ data ["pm2_5_raw_value" ].isna (), "pm2_5" ] = np .nan
581
+
582
+ if data ["pm2_5_raw_value" ].isna ().all ():
583
+ data .drop (columns = ["pm2_5_raw_value" ], inplace = True )
584
+
585
+ data ["pm2_5" ] = data ["pm2_5" ].where (data ["pm2_5_raw_value" ].isna (), np .nan )
586
+
587
+ data .dropna (how = "all" , axis = 1 , inplace = True )
588
+
589
+ return data
590
+
546
591
@classmethod
547
592
def data_export_query (
548
593
cls ,
0 commit comments