14
14
from typing import Literal , Optional
15
15
16
16
from calitp_data_analysis import utils
17
+ from calitp_data_analysis .geography_utils import WGS84
17
18
18
19
from segment_speed_utils import gtfs_schedule_wrangling , segment_calcs , time_series_utils
19
20
from shared_utils import publish_utils , time_helpers
@@ -43,8 +44,60 @@ def import_singleday_segment_speeds(
43
44
return df
44
45
45
46
47
+ def export_segment_geometry (
48
+ year : str ,
49
+ ):
50
+ """
51
+ Dedupe segment geometries using columns,
52
+ since geometries may slightly differ.
53
+ Visual inspection shows start and endpoints might be
54
+ slightly different but still capture the same corridor.
55
+
56
+ Big Blue Bus: stop_pair = "1115__187"
57
+ In 2024, there are 4 rows, but the 4 rows are basically the same,
58
+ so let's keep the most recent row.
59
+ """
60
+ SEGMENTS_FILE = GTFS_DATA_DICT .rt_stop_times .segments_file
61
+ EXPORT_FILE = GTFS_DATA_DICT .rt_stop_times .segments_year_file
62
+
63
+ keep_cols = [
64
+ "schedule_gtfs_dataset_key" ,
65
+ "route_id" , "direction_id" ,
66
+ "stop_pair" ,
67
+ ]
68
+
69
+ dates_in_year = [
70
+ date for date in rt_dates .all_dates if year in date
71
+ ]
72
+
73
+ df = time_series_utils .concatenate_datasets_across_dates (
74
+ SEGMENT_GCS ,
75
+ SEGMENTS_FILE ,
76
+ dates_in_year ,
77
+ columns = keep_cols + ["geometry" ],
78
+ data_type = "gdf" ,
79
+ get_pandas = False ,
80
+ ).sort_values (
81
+ "service_date" , ascending = False
82
+ ).drop (
83
+ columns = "service_date"
84
+ ).drop_duplicates (
85
+ subset = keep_cols
86
+ ).reset_index (drop = True ).to_crs (WGS84 )
87
+
88
+ df = df .compute ()
89
+
90
+ df .to_parquet (
91
+ f"{ SEGMENT_GCS } { EXPORT_FILE } _{ year } .parquet" ,
92
+ )
93
+
94
+ print (f"exported stop segments for year { year } " )
95
+
96
+ return
97
+
98
+
46
99
def annual_time_of_day_averages (
47
- analysis_date_list : list ,
100
+ year : str ,
48
101
segment_type : Literal [SEGMENT_TYPES ],
49
102
config_path : Optional = GTFS_DATA_DICT
50
103
):
@@ -63,6 +116,7 @@ def annual_time_of_day_averages(
63
116
dict_inputs = config_path [segment_type ]
64
117
65
118
SPEED_FILE = dict_inputs ["segment_timeofday" ]
119
+ SEGMENTS_YEAR_FILE = dict_inputs ["segments_year_file" ]
66
120
EXPORT_FILE = dict_inputs ["segment_timeofday_weekday_year" ]
67
121
68
122
SEGMENT_COLS = [* dict_inputs ["segment_cols" ]]
@@ -71,6 +125,10 @@ def annual_time_of_day_averages(
71
125
OPERATOR_COLS = ["schedule_gtfs_dataset_key" ]
72
126
CROSSWALK_COLS = [* dict_inputs .crosswalk_cols ]
73
127
128
+ analysis_date_list = [
129
+ date for date in rt_dates .all_dates if year in date
130
+ ]
131
+
74
132
df = import_singleday_segment_speeds (
75
133
SEGMENT_GCS ,
76
134
SPEED_FILE ,
@@ -80,50 +138,44 @@ def annual_time_of_day_averages(
80
138
).pipe (
81
139
time_helpers .add_quarter
82
140
)
83
-
84
- avg_speeds = segment_calcs .calculate_weighted_averages (
85
- df ,
86
- OPERATOR_COLS + SEGMENT_COLS_NO_GEOM + ["time_of_day" , "weekday_weekend" , "year" ],
87
- metric_cols = ["p20_mph" , "p50_mph" , "p80_mph" ],
88
- weight_col = "n_trips"
89
- ).persist ()
90
141
91
- publish_utils .if_exists_then_delete (
92
- f"{ SEGMENT_GCS } { EXPORT_FILE } "
93
- )
142
+ group_cols = OPERATOR_COLS + SEGMENT_COLS_NO_GEOM + [
143
+ "time_of_day" , "weekday_weekend" , "year" ]
94
144
95
- avg_speeds .to_parquet (
96
- f"{ SEGMENT_GCS } { EXPORT_FILE } " ,
97
- partition_on = "time_of_day"
98
- )
99
- '''
100
- speeds_gdf = delayed(segment_calcs.merge_in_segment_geometry)(
101
- avg_speeds,
102
- analysis_date_list,
103
- segment_type,
104
- SEGMENT_COLS
105
- ).pipe(
106
- gtfs_schedule_wrangling.merge_operator_identifiers,
145
+ speed_cols = ["p20_mph" , "p50_mph" , "p80_mph" ]
146
+ weight_col = "n_trips"
147
+
148
+ orig_dtypes = df [group_cols + speed_cols + [weight_col ]].dtypes .to_dict ()
149
+
150
+ avg_speeds = df .map_partitions (
151
+ segment_calcs .calculate_weighted_averages ,
152
+ OPERATOR_COLS + SEGMENT_COLS_NO_GEOM + ["time_of_day" , "weekday_weekend" , "year" ],
153
+ metric_cols = speed_cols ,
154
+ weight_col = weight_col ,
155
+ meta = {
156
+ ** orig_dtypes ,
157
+ },
158
+ align_dataframes = False
159
+ ).compute ().pipe (
160
+ gtfs_schedule_wrangling .merge_operator_identifiers ,
107
161
analysis_date_list ,
108
162
columns = CROSSWALK_COLS
109
163
)
110
164
111
- utils.geoparquet_gcs_export(
112
- speeds_gdf,
113
- SEGMENT_GCS,
114
- EXPORT_FILE
165
+ avg_speeds .to_parquet (
166
+ f"{ SEGMENT_GCS } { EXPORT_FILE } _{ year } .parquet"
115
167
)
116
- '''
117
168
118
169
end = datetime .datetime .now ()
119
170
120
171
logger .info (
121
- f"{ segment_type } : weekday/time-of-day averages for { analysis_date_list } "
172
+ f"{ segment_type } : weekday/time-of-day averages for { year } "
122
173
f"execution time: { end - start } "
123
174
)
124
175
125
176
return
126
177
178
+
127
179
if __name__ == "__main__" :
128
180
129
181
from shared_utils import rt_dates
@@ -135,9 +187,18 @@ def annual_time_of_day_averages(
135
187
format = "{time:YYYY-MM-DD at HH:mm:ss} | {level} | {message}" ,
136
188
level = "INFO" )
137
189
190
+ # isolate segments per year to allow for export
191
+ # rerun previous years when necessary
192
+ for year in ["2025" ]:
193
+
194
+ export_segment_geometry (year )
138
195
139
- annual_time_of_day_averages (
140
- rt_dates .all_dates ,
141
- segment_type = "rt_stop_times" ,
142
- )
196
+ annual_time_of_day_averages (
197
+ year ,
198
+ segment_type = "rt_stop_times" ,
199
+ )
200
+
201
+
202
+
203
+
143
204
0 commit comments