8
8
9
9
from calitp_data_analysis import utils
10
10
from segment_speed_utils import time_series_utils
11
- from shared_utils import gtfs_utils_v2 , publish_utils
12
- from merge_data import merge_in_standardized_route_names
11
+ from shared_utils import gtfs_utils_v2 , portfolio_utils , publish_utils
12
+ from merge_data import merge_in_standardized_route_names , PORTFOLIO_ORGANIZATIONS_DICT
13
13
from update_vars import GTFS_DATA_DICT , SCHED_GCS , RT_SCHED_GCS
14
14
15
15
sort_cols = ["schedule_gtfs_dataset_key" , "service_date" ]
16
16
17
17
"""
18
18
Concatenating Functions
19
19
"""
20
- def concatenate_rt_vs_schedule_operator_metrics (
20
+ def concatenate_schedule_operator_metrics (
21
21
date_list : list
22
22
) -> pd .DataFrame :
23
-
24
- FILE = f"{ GTFS_DATA_DICT .rt_vs_schedule_tables .vp_operator_metrics } "
23
+ """
24
+ Get spatial accuracy and vehicle positions per minute metrics on the
25
+ operator-service_date grain for certain dates.
26
+ """
27
+ FILE = GTFS_DATA_DICT .schedule_tables .operator_scheduled_stats
25
28
26
29
df = time_series_utils .concatenate_datasets_across_dates (
27
30
SCHED_GCS ,
@@ -32,9 +35,31 @@ def concatenate_rt_vs_schedule_operator_metrics(
32
35
33
36
return df
34
37
38
+ def concatenate_rt_vs_schedule_operator_metrics (
39
+ date_list : list
40
+ ) -> pd .DataFrame :
41
+ """
42
+ Concatenate operator grain RT vs schedule metrics
43
+ across all dates we have.
44
+ """
45
+ FILE = GTFS_DATA_DICT .rt_vs_schedule_tables .vp_operator_metrics
46
+
47
+ df = time_series_utils .concatenate_datasets_across_dates (
48
+ RT_SCHED_GCS ,
49
+ FILE ,
50
+ date_list ,
51
+ data_type = "df" ,
52
+ ).sort_values (sort_cols ).reset_index (drop = True )
53
+
54
+ return df
55
+
35
56
def concatenate_operator_routes (
36
57
date_list : list
37
58
) -> gpd .GeoDataFrame :
59
+ """
60
+ Concatenate operator route gdf (1 representative shape chosen)
61
+ across all dates we have.
62
+ """
38
63
FILE = GTFS_DATA_DICT .schedule_tables .operator_routes
39
64
40
65
df = time_series_utils .concatenate_datasets_across_dates (
@@ -44,20 +69,24 @@ def concatenate_operator_routes(
44
69
data_type = "gdf" ,
45
70
).sort_values (sort_cols ).reset_index (drop = True )
46
71
72
+ # TODO is there a short/long route, can it be flagged per date as a new column here?
73
+
47
74
return df
48
75
76
+
49
77
def concatenate_crosswalks (
50
78
date_list : list
51
79
) -> pd .DataFrame :
52
80
"""
53
81
Get crosswalk and selected NTD columns for certain dates.
54
82
"""
55
- FILE = f" { GTFS_DATA_DICT .schedule_tables .gtfs_key_crosswalk } "
83
+ FILE = GTFS_DATA_DICT .schedule_tables .gtfs_key_crosswalk
56
84
57
85
ntd_cols = [
58
86
"schedule_gtfs_dataset_key" ,
87
+ "name" ,
59
88
"caltrans_district" ,
60
- "counties_served" ,
89
+ # "counties_served", # remove this and create our own column
61
90
"service_area_sq_miles" ,
62
91
"hq_city" ,
63
92
"service_area_pop" ,
@@ -74,91 +103,96 @@ def concatenate_crosswalks(
74
103
data_type = "df" ,
75
104
columns = ntd_cols
76
105
)
77
- .sort_values ([ "service_date" ] )
106
+ .sort_values (sort_cols )
78
107
.reset_index (drop = True )
108
+ )
109
+
110
+ df = df .assign (
111
+ caltrans_district = df .caltrans_district .map (
112
+ portfolio_utils .CALTRANS_DISTRICT_DICT
113
+ )
114
+ ).pipe (
115
+ portfolio_utils .standardize_portfolio_organization_names ,
116
+ PORTFOLIO_ORGANIZATIONS_DICT
79
117
)
80
118
119
+
120
+ # to aggregate up to organization,
121
+ # group by name-service_date-portfolio_organization_name
122
+ # because name indicates different feeds, so we want to sum those.
123
+
124
+
81
125
return df
82
126
83
- def concatenate_schedule_operator_metrics (
84
- date_list : list
127
+ def merge_data_sources_by_operator (
128
+ df_schedule : pd .DataFrame ,
129
+ df_rt_sched : pd .DataFrame ,
130
+ df_crosswalk : pd .DataFrame
85
131
) -> pd .DataFrame :
86
132
"""
87
- Get spatial accuracy and vehicle positions per minute metrics on the
88
- operator-service_date grain for certain dates.
89
- """
90
- FILE = GTFS_DATA_DICT .schedule_tables .operator_scheduled_stats
91
-
92
- df = time_series_utils .concatenate_datasets_across_dates (
93
- RT_SCHED_GCS ,
94
- FILE ,
95
- date_list ,
96
- data_type = "df" ,
97
- ).sort_values (sort_cols ).reset_index (drop = True )
98
-
133
+ Merge schedule and rt_vs_schedule data,
134
+ which are all at operator-date grain.
135
+ This merged dataset will be used in GTFS digest visualizations.
136
+ """
137
+ df = pd .merge (
138
+ df_schedule ,
139
+ df_rt_sched ,
140
+ on = sort_cols ,
141
+ how = "left" ,
142
+ ).merge (
143
+ df_crosswalk ,
144
+ on = sort_cols + ["name" ],
145
+ how = "inner"
146
+ )
147
+
99
148
return df
100
149
150
+ ## TODO: move counties stuff here
151
+ # swap order at the bottom since this needs to be created first
152
+ def counties_served_by_operator (route_gdf_by_operator ):
153
+ """
154
+ take input produced in concatenate_operator_routes
155
+ get counties for operator-date
156
+ df should only be operator-date-counties_served
157
+ use this to merge into crosswalk and replace NTD column
158
+ """
159
+
160
+ return
101
161
102
162
if __name__ == "__main__" :
103
163
104
164
from shared_utils import rt_dates
105
165
106
166
analysis_date_list = (
107
- rt_dates .y2024_dates + rt_dates .y2023_dates +
108
- rt_dates .y2025_dates
167
+ rt_dates .y2025_dates + rt_dates .y2024_dates + rt_dates .y2023_dates
109
168
)
110
169
111
170
OPERATOR_PROFILE = GTFS_DATA_DICT .digest_tables .operator_profiles
112
171
OPERATOR_ROUTE = GTFS_DATA_DICT .digest_tables .operator_routes_map
113
172
114
173
public_feeds = gtfs_utils_v2 .filter_to_public_schedule_gtfs_dataset_keys ()
115
174
116
- # Concat operator metrics.
117
- op_sched_metrics = concatenate_schedule_operator_metrics (analysis_date_list )
118
-
119
- # Concat operator profiles
120
- op_rt_sched_metrics = concatenate_rt_vs_schedule_operator_metrics (analysis_date_list )
121
-
122
- merge_cols = ["schedule_gtfs_dataset_key" ,
123
- "service_date" ]
124
-
125
- # Merge the two together
126
- operator_profiles_df1 = pd .merge (op_sched_metrics ,
127
- op_rt_sched_metrics ,
128
- on = merge_cols ,
129
- how = "outer" )
130
-
175
+ # Concat operator grain for schedule metrics.
176
+ schedule_df = concatenate_schedule_operator_metrics (analysis_date_list )
131
177
178
+ # Concat operator grain for rt vs schedule metrics
179
+ rt_schedule_df = concatenate_rt_vs_schedule_operator_metrics (
180
+ analysis_date_list )
181
+
132
182
# Concat NTD/crosswalk
133
183
crosswalk_df = concatenate_crosswalks (analysis_date_list )
134
184
135
- # Merge in NTD data.
136
- op_profiles_df2 = pd .merge (
137
- operator_profiles_df1 ,
138
- crosswalk_df ,
139
- on = merge_cols ,
140
- how = "left"
185
+ operator_df = merge_data_sources_by_operator (
186
+ schedule_df ,
187
+ rt_schedule_df ,
188
+ crosswalk_df
189
+ ).pipe (
190
+ publish_utils .exclude_private_datasets ,
191
+ col = "schedule_gtfs_dataset_key" ,
192
+ public_gtfs_dataset_keys = public_feeds
141
193
)
142
194
143
- # Drop duplicates created after merging
144
- # Add more strigent drop duplicate criteria
145
- duplicate_cols = ["schedule_gtfs_dataset_key" ,
146
- "vp_per_min_agency" ,
147
- "spatial_accuracy_agency" ,
148
- "service_date" ,
149
- "organization_name" ,
150
- "caltrans_district" ]
151
-
152
- op_profiles_df3 = (
153
- op_profiles_df2
154
- .pipe (
155
- publish_utils .exclude_private_datasets ,
156
- col = "schedule_gtfs_dataset_key" ,
157
- public_gtfs_dataset_keys = public_feeds
158
- ).drop_duplicates (subset = duplicate_cols )
159
- .reset_index (drop = True ))
160
-
161
- op_profiles_df3 .to_parquet (
195
+ operator_df .to_parquet (
162
196
f"{ RT_SCHED_GCS } { OPERATOR_PROFILE } .parquet"
163
197
)
164
198
0 commit comments