@@ -658,24 +658,29 @@ def extract_hourly_data(**kwargs) -> pd.DataFrame:
658
658
from airqo_etl_utils .date import date_to_str_hours
659
659
660
660
# Only used the first time
661
- start = kwargs .get ("params" , {}).get ("start_date" , "2021-01-01" )
662
- end_d = kwargs .get ("params" , {}).get ("end_date" , "2021-12-31" )
663
- end_d = datetime .strptime (end_d , "%Y-%m-%d" )
664
- end_dt = end_d .replace (hour = 23 , minute = 59 , second = 59 )
665
- end = datetime .strftime (end_dt , "%Y-%m-%dT%H:%M:%SZ" )
661
+ start = kwargs .get ("params" , {}).get ("start_date" , "2021-01-01T00:00:00Z" )
662
+ start = datetime .strptime (start , "%%Y-%m-%dT%H:%M:%SZ" )
663
+ end = kwargs .get ("params" , {}).get ("end_date" , "2021-12-31T23:59:59Z" )
664
+ end = datetime .strptime (end , "%Y-%m-%dT%H:%M:%SZ" )
666
665
667
666
previous_date = kwargs ["ti" ].xcom_pull (key = "new_date" )
668
667
if not previous_date :
669
668
previous_date = start
670
669
671
- hour_of_day = previous_date + timedelta (hours = 1 )
672
-
673
- start_date_time = date_to_str_hours (previous_date )
670
+ hour_of_day = (
671
+ datetime .strptime (previous_date , "%%Y-%m-%dT%H:%M:%SZ" )
672
+ if not isinstance (previous_date , datetime )
673
+ else previous_date
674
+ )
675
+ start_date_time = date_to_str_hours (hour_of_day )
674
676
end_date_time = datetime .strftime (hour_of_day , "%Y-%m-%dT%H:59:59Z" )
675
677
676
678
if start_date_time > end or end_date_time > end :
677
679
raise AirflowFailException (f"Run expired on { end } " )
678
680
681
+ if previous_date == start :
682
+ kwargs ["ti" ].xcom_push (key = "new_date" , value = hour_of_day )
683
+
679
684
return DataUtils .extract_data_from_bigquery (
680
685
DataType .AVERAGED ,
681
686
start_date_time = start_date_time ,
0 commit comments