1+ """
2+ Average segment speeds over longer time periods,
3+ a quarter or a year.
4+ """
5+ import dask .dataframe as dd
16import geopandas as gpd
27import pandas as pd
38
49from dask import delayed , compute
510
6- from update_vars import SEGMENT_GCS , GTFS_DATA_DICT
11+ from calitp_data_analysis import utils
712from segment_speed_utils import time_series_utils
13+ from update_vars import SEGMENT_GCS , GTFS_DATA_DICT
814
9- from average_segment_speeds import concatenate_trip_segment_speeds
10-
11- def concatenate_single_day_summaries (
12- speed_file : str ,
13- analysis_date_list : list ,
14- group_cols : list
15+ def segment_speeds_one_day (
16+ segment_type : str ,
17+ analysis_date : list ,
18+ segment_cols : list ,
19+ org_cols : list
1520):
1621 """
17- Concatenate several single day averages
18- and we'll take the average over that.
22+ Concatenate segment geometry (from rt_segment_speeds)
23+ for all the dates we have
24+ and get it to route-direction-segment grain
25+ """
26+ speed_file = GTFS_DATA_DICT [segment_type ]["route_dir_single_segment" ]
27+ segment_file = GTFS_DATA_DICT [segment_type ]["segments_file" ]
28+
29+ speeds_df = pd .read_parquet (
30+ f"{ SEGMENT_GCS } { speed_file } _{ analysis_date } .parquet" ,
31+ columns = segment_cols + org_cols + [
32+ "schedule_gtfs_dataset_key" ,
33+ "p20_mph" , "p50_mph" , "p80_mph" , "n_trips" ]
34+ ).assign (
35+ service_date = pd .to_datetime (analysis_date )
36+ )
1937
20- If we have 6 dates of segment p20/p50/p80 speeds,
21- we'll treat each date as an independent entity
22- and average the p20/p50/p80 speeds over that time period.
38+ segment_gdf = gpd .read_parquet (
39+ f"{ SEGMENT_GCS } { segment_file } _{ analysis_date } .parquet" ,
40+ columns = segment_cols + [
41+ "schedule_gtfs_dataset_key" , "geometry" ]
42+ ).drop_duplicates ().reset_index (drop = True )
43+
44+ merge_cols = [c for c in speeds_df .columns if c in segment_gdf .columns ]
2345
24- We will not go back to trip segment speeds for each date
25- and do a weighted average.
26- In an extreme case, if one date had 1,000 trips and another date
27- had 100 trips, one date would have 10x the weight of another date,
28- and here, we just want to see where the p20 speed typically is.
29- """
30- df = time_series_utils .concatenate_datasets_across_dates (
31- SEGMENT_GCS ,
32- speed_file ,
33- analysis_date_list ,
34- data_type = "df" ,
35- columns = group_cols + ["p20_mph" , "p50_mph" , "p80_mph" , "n_trips" ],
36- get_pandas = False
46+ df = pd .merge (
47+ segment_gdf [merge_cols + ["geometry" ]].drop_duplicates (),
48+ speeds_df ,
49+ on = merge_cols ,
50+ how = "inner"
3751 )
3852
3953 df = df .assign (
4054 year = df .service_date .dt .year ,
4155 quarter = df .service_date .dt .quarter ,
4256 )
43-
57+
4458 return df
4559
60+
4661def get_aggregation (df : pd .DataFrame , group_cols : list ):
62+ """
63+ Aggregating across days, take the (mean)p20/p50/p80 speed
64+ and count number of trips across those days.
65+ """
4766 speed_cols = [c for c in df .columns if "_mph" in c ]
4867
4968 df2 = (df
@@ -56,36 +75,83 @@ def get_aggregation(df: pd.DataFrame, group_cols: list):
5675
5776 return df2
5877
59- if __name__ == "__main__" :
60-
61- from shared_utils import rt_dates
62-
63- group_cols = [
78+ def average_by_time ( date_list : list , time_cols : list ) :
79+ """
80+ """
81+ # These define segments, it's route-dir-stop_pair
82+ segment_stop_cols = [
6483 "route_id" , "direction_id" ,
65- "stop_pair" , "stop_pair_name" ,
84+ "stop_pair" ,
85+ ]
86+
87+ # These are the other columns we need, from speeds, but not in segments
88+ org_cols = [
89+ "stop_pair_name" ,
6690 "time_period" ,
67- ' name' , # do not use schedule_gtfs_dataset_key, which can differ over time
91+ " name" ,
6892 'caltrans_district' , 'organization_source_record_id' ,
6993 'organization_name' , 'base64_url'
70- ]
94+ ]
7195
72- FILE = GTFS_DATA_DICT [ "stop_segments" ][ "route_dir_single_segment" ]
73-
74- quarter_df = concatenate_single_day_summaries (
75- FILE ,
76- all_dates ,
77- group_cols
78- ). pipe ( get_aggregation , group_cols + [ "year" , "quarter" ])
79-
80- quarter_df = compute ( quarter_df )[ 0 ]
81- quarter_df . to_parquet ( f" { SEGMENT_GCS } { FILE } _quarter.parquet" )
96+ delayed_dfs = [
97+ delayed ( segment_speeds_one_day )(
98+ "stop_segments" ,
99+ one_date ,
100+ segment_stop_cols ,
101+ org_cols
102+ ) for one_date in date_list
103+ ]
104+
105+ ddf = dd . from_delayed ( delayed_dfs )
82106
83- year_df = concatenate_single_day_summaries (
84- FILE ,
85- all_dates ,
86- group_cols
87- ).pipe (get_aggregation , group_cols + ["year" ])
107+ group_cols = [
108+ c for c in segment_stop_cols + org_cols
109+ if c not in ["schedule_gtfs_dataset_key" ]
110+ ] + time_cols
111+
112+ speed_averages = get_aggregation (ddf , group_cols )
113+ speed_averages = speed_averages .compute ()
114+
115+ segment_geom = ddf [
116+ ["name" , "geometry" ] + segment_stop_cols + time_cols
117+ ].drop_duplicates ().compute ()
118+
119+ speed_gdf = pd .merge (
120+ segment_geom ,
121+ speed_averages ,
122+ on = ["name" ] + segment_stop_cols + time_cols ,
123+ how = "inner"
124+ )
88125
89- year_df = compute (year_df )[0 ]
90- year_df .to_parquet (f"{ SEGMENT_GCS } { FILE } _year.parquet" )
91-
126+ return speed_gdf
127+
128+
129+ if __name__ == "__main__" :
130+ import datetime
131+ from shared_utils import rt_dates
132+
133+ segment_type = "stop_segments"
134+ EXPORT = GTFS_DATA_DICT [segment_type ]["route_dir_multi_segment" ]
135+ all_dates = rt_dates .y2024_dates + rt_dates .y2023_dates
136+ '''
137+ # quarter averages take x min
138+ speeds_by_quarter = average_by_time(all_dates, ["year", "quarter"])
139+
140+ utils.geoparquet_gcs_export(
141+ speeds_by_quarter,
142+ SEGMENT_GCS,
143+ f"{EXPORT}_quarter"
144+ )
145+ del speeds_by_quarter
146+ '''
147+ # year averages take 14 min
148+ t0 = datetime .datetime .now ()
149+ speeds_by_year = average_by_time (all_dates , ["year" ])
150+
151+ utils .geoparquet_gcs_export (
152+ speeds_by_year ,
153+ SEGMENT_GCS ,
154+ f"{ EXPORT } _year"
155+ )
156+ t1 = datetime .datetime .now ()
157+ print (f"execution: { t1 - t0 } " )
0 commit comments