1414from typing import Literal , Optional
1515
1616from calitp_data_analysis import utils
17+ from calitp_data_analysis .geography_utils import WGS84
1718
1819from segment_speed_utils import gtfs_schedule_wrangling , segment_calcs , time_series_utils
1920from shared_utils import publish_utils , time_helpers
@@ -43,8 +44,60 @@ def import_singleday_segment_speeds(
4344 return df
4445
4546
47+ def export_segment_geometry (
48+ year : str ,
49+ ):
50+ """
51+ Dedupe segment geometries using columns,
52+ since geometries may slightly differ.
53+ Visual inspection shows start and endpoints might be
54+ slightly different but still capture the same corridor.
55+
56+ Big Blue Bus: stop_pair = "1115__187"
57+ In 2024, there are 4 rows, but the 4 rows are basically the same,
58+ so let's keep the most recent row.
59+ """
60+ SEGMENTS_FILE = GTFS_DATA_DICT .rt_stop_times .segments_file
61+ EXPORT_FILE = GTFS_DATA_DICT .rt_stop_times .segments_year_file
62+
63+ keep_cols = [
64+ "schedule_gtfs_dataset_key" ,
65+ "route_id" , "direction_id" ,
66+ "stop_pair" ,
67+ ]
68+
69+ dates_in_year = [
70+ date for date in rt_dates .all_dates if year in date
71+ ]
72+
73+ df = time_series_utils .concatenate_datasets_across_dates (
74+ SEGMENT_GCS ,
75+ SEGMENTS_FILE ,
76+ dates_in_year ,
77+ columns = keep_cols + ["geometry" ],
78+ data_type = "gdf" ,
79+ get_pandas = False ,
80+ ).sort_values (
81+ "service_date" , ascending = False
82+ ).drop (
83+ columns = "service_date"
84+ ).drop_duplicates (
85+ subset = keep_cols
86+ ).reset_index (drop = True ).to_crs (WGS84 )
87+
88+ df = df .compute ()
89+
90+ df .to_parquet (
91+ f"{ SEGMENT_GCS } { EXPORT_FILE } _{ year } .parquet" ,
92+ )
93+
94+ print (f"exported stop segments for year { year } " )
95+
96+ return
97+
98+
4699def annual_time_of_day_averages (
47- analysis_date_list : list ,
100+ year : str ,
48101 segment_type : Literal [SEGMENT_TYPES ],
49102 config_path : Optional = GTFS_DATA_DICT
50103):
@@ -63,6 +116,7 @@ def annual_time_of_day_averages(
63116 dict_inputs = config_path [segment_type ]
64117
65118 SPEED_FILE = dict_inputs ["segment_timeofday" ]
119+ SEGMENTS_YEAR_FILE = dict_inputs ["segments_year_file" ]
66120 EXPORT_FILE = dict_inputs ["segment_timeofday_weekday_year" ]
67121
68122 SEGMENT_COLS = [* dict_inputs ["segment_cols" ]]
@@ -71,6 +125,10 @@ def annual_time_of_day_averages(
71125 OPERATOR_COLS = ["schedule_gtfs_dataset_key" ]
72126 CROSSWALK_COLS = [* dict_inputs .crosswalk_cols ]
73127
128+ analysis_date_list = [
129+ date for date in rt_dates .all_dates if year in date
130+ ]
131+
74132 df = import_singleday_segment_speeds (
75133 SEGMENT_GCS ,
76134 SPEED_FILE ,
@@ -98,44 +156,26 @@ def annual_time_of_day_averages(
98156 ** orig_dtypes ,
99157 },
100158 align_dataframes = False
101- ).compute ()
102-
103-
104- publish_utils .if_exists_then_delete (
105- f"{ SEGMENT_GCS } { EXPORT_FILE } "
106- )
107-
108- avg_speeds .to_parquet (
109- f"{ SEGMENT_GCS } { EXPORT_FILE } .parquet" ,
110- )
111- '''
112- speeds_gdf = segment_calcs.merge_in_segment_geometry(
113- avg_speeds,
114- analysis_date_list,
115- segment_type,
116- SEGMENT_COLS
117- ).pipe(
118- gtfs_schedule_wrangling.merge_operator_identifiers,
159+ ).compute ().pipe (
160+ gtfs_schedule_wrangling .merge_operator_identifiers ,
119161 analysis_date_list ,
120162 columns = CROSSWALK_COLS
121163 )
122164
123- utils.geoparquet_gcs_export(
124- speeds_gdf,
125- SEGMENT_GCS,
126- EXPORT_FILE
165+ avg_speeds .to_parquet (
166+ f"{ SEGMENT_GCS } { EXPORT_FILE } _{ year } .parquet"
127167 )
128- '''
129168
130169 end = datetime .datetime .now ()
131170
132171 logger .info (
133- f"{ segment_type } : weekday/time-of-day averages for { analysis_date_list } "
172+ f"{ segment_type } : weekday/time-of-day averages for { year } "
134173 f"execution time: { end - start } "
135174 )
136175
137176 return
138177
178+
139179if __name__ == "__main__" :
140180
141181 from shared_utils import rt_dates
@@ -147,9 +187,18 @@ def annual_time_of_day_averages(
147187 format = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" ,
148188 level = "INFO" )
149189
190+ # isolate segments per year to allow for export
191+ # rerun previous years when necessary
192+ for year in ["2025" ]:
193+
194+ export_segment_geometry (year )
150195
151- annual_time_of_day_averages (
152- rt_dates .all_dates ,
153- segment_type = "rt_stop_times" ,
154- )
196+ annual_time_of_day_averages (
197+ year ,
198+ segment_type = "rt_stop_times" ,
199+ )
200+
201+
202+
203+
155204
0 commit comments