11"""
2- Concatenate batched vehicle positions.
2+ Concatenate batched vehicle positions.
33"""
4- import dask .dataframe as dd
5- import dask_geopandas as dg
4+
65import datetime
6+ import sys
7+ from functools import cache
8+
9+ import dask .dataframe as dd
710import gcsfs
811import geopandas as gpd
912import pandas as pd
1013import shapely
11- import sys
12-
13- from dask import delayed , compute
14+ from calitp_data_analysis import utils
15+ from calitp_data_analysis .gcs_geopandas import GCSGeoPandas
16+ from calitp_data_analysis .gcs_pandas import GCSPandas
17+ from dask import compute , delayed
1418from loguru import logger
15-
1619from shared_utils import schedule_rt_utils
17- from calitp_data_analysis import utils
1820from update_vars import GTFS_DATA_DICT , SEGMENT_GCS
1921
22+
23+ @cache
24+ def gcs_pandas ():
25+ return GCSPandas ()
26+
27+
28+ @cache
29+ def gcs_geopandas ():
30+ return GCSGeoPandas ()
31+
32+
2033fs = gcsfs .GCSFileSystem ()
2134
35+
2236def concat_batches (analysis_date : str ) -> dd .DataFrame :
2337 """
2438 Append individual operator vehicle position parquets together
@@ -27,19 +41,16 @@ def concat_batches(analysis_date: str) -> dd.DataFrame:
2741
2842 fs_list = fs .ls (f"{ SEGMENT_GCS } " )
2943
30- vp_files = [i for i in fs_list if "vp_raw" in i
31- and f"{ analysis_date } _batch" in i ]
32-
33- delayed_dfs = [delayed (pd .read_parquet )(f"gs://{ f } " )
34- for f in vp_files ]
35-
44+ vp_files = [i for i in fs_list if "vp_raw" in i and f"{ analysis_date } _batch" in i ]
45+
46+ delayed_dfs = [delayed (gcs_pandas ().read_parquet )(f"gs://{ f } " ) for f in vp_files ]
47+
3648 ddf = dd .from_delayed (delayed_dfs )
37-
49+
3850 ddf = schedule_rt_utils .localize_timestamp_col (
39- ddf ,
40- ["location_timestamp" ] # add moving_timestamp with new mart table
51+ ddf , ["location_timestamp" ] # add moving_timestamp with new mart table
4152 )
42-
53+
4354 return ddf
4455
4556
@@ -49,53 +60,44 @@ def vp_into_gdf(df: pd.DataFrame) -> gpd.GeoDataFrame:
4960 """
5061 # Drop Nones or else shapely will error
5162 df2 = df [df .location .notna ()].reset_index (drop = True )
52-
63+
5364 geom = [shapely .wkt .loads (x ) for x in df2 .location ]
5465
55- gdf = gpd .GeoDataFrame (
56- df2 , geometry = geom ,
57- crs = "EPSG:4326" ).drop (columns = "location" )
58-
66+ gdf = gpd .GeoDataFrame (df2 , geometry = geom , crs = "EPSG:4326" ).drop (columns = "location" )
67+
5968 return gdf
6069
6170
6271def remove_batched_parquets (analysis_date : str ):
6372 """
64- Remove the batches of parquet downloads.
73+ Remove the batches of parquet downloads.
6574 These have file name pattern of *_batch*.
6675 """
6776 fs_list = fs .ls (f"{ SEGMENT_GCS } " )
68-
69- vp_files = [
70- i for i in fs_list if "vp_raw" in i
71- and f"{ analysis_date } _batch" in i
72- ]
73-
74- concat_file = [i for i in fs_list if
75- f"{ analysis_date } _concat" in i
76- ]
77-
77+
78+ vp_files = [i for i in fs_list if "vp_raw" in i and f"{ analysis_date } _batch" in i ]
79+
80+ concat_file = [i for i in fs_list if f"{ analysis_date } _concat" in i ]
81+
7882 for f in vp_files :
7983 fs .rm (f )
80-
84+
8185 for f in concat_file :
8286 fs .rm (f , recursive = True )
83-
84-
87+
88+
8589if __name__ == "__main__" :
86-
90+
8791 from update_vars import analysis_date_list
8892
8993 LOG_FILE = "./logs/download_vp_v2.log"
9094 logger .add (LOG_FILE , retention = "3 months" )
91- logger .add (sys .stderr ,
92- format = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" ,
93- level = "INFO" )
94-
95+ logger .add (sys .stderr , format = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" , level = "INFO" )
96+
9597 RAW_VP = GTFS_DATA_DICT .speeds_tables .raw_vp
96-
98+
9799 for analysis_date in analysis_date_list :
98-
100+
99101 logger .info (f"Analysis date: { analysis_date } " )
100102
101103 start = datetime .datetime .now ()
@@ -106,36 +108,34 @@ def remove_batched_parquets(analysis_date: str):
106108 time1 = datetime .datetime .now ()
107109 logger .info (f"concat and filter batched data: { time1 - start } " )
108110
109- concatenated_vp_df . to_parquet (
110- f"{ SEGMENT_GCS } { RAW_VP } _{ analysis_date } _concat" ,
111- partition_on = "gtfs_dataset_key" )
111+ gcs_pandas (). data_frame_to_parquet (
112+ concatenated_vp_df , f"{ SEGMENT_GCS } { RAW_VP } _{ analysis_date } _concat" , partition_on = "gtfs_dataset_key"
113+ )
112114
113115 time2 = datetime .datetime .now ()
114116 logger .info (f"export concatenated vp: { time2 - time1 } " )
115117
116118 # Delete objects once it's saved out
117119 # Loop to save out multiple dates of vp may cause kernel to crash
118120 del concatenated_vp_df
119-
121+
120122 # Import concatenated tabular vp and make it a gdf
121- vp = delayed (pd .read_parquet )(
122- f"{ SEGMENT_GCS } { RAW_VP } _{ analysis_date } _concat/"
123- ).reset_index (drop = True )
123+ # Stripping gs:// from the supplied path avoids a pyarrow error?
124+ logger .info (f"path: { SEGMENT_GCS [5 :]} { RAW_VP } _{ analysis_date } _concat/" )
125+ vp = delayed (gcs_pandas ().read_parquet )(f"{ SEGMENT_GCS [5 :]} { RAW_VP } _{ analysis_date } _concat/" ).reset_index (
126+ drop = True
127+ )
124128
125129 vp_gdf = delayed (vp_into_gdf )(vp )
126-
130+
127131 vp_gdf = compute (vp_gdf )[0 ]
128132
129- utils .geoparquet_gcs_export (
130- vp_gdf ,
131- SEGMENT_GCS ,
132- f"{ RAW_VP } _{ analysis_date } "
133- )
133+ utils .geoparquet_gcs_export (vp_gdf , SEGMENT_GCS , f"{ RAW_VP } _{ analysis_date } " )
134134
135135 remove_batched_parquets (analysis_date )
136- logger .info (f "remove batched parquets" )
136+ logger .info ("remove batched parquets" )
137137
138138 end = datetime .datetime .now ()
139139 logger .info (f"execution time: { end - start } " )
140-
140+
141141 del vp_gdf
0 commit comments