1
+ """
2
+ Grab stashed trips parquet file.
3
+ Add in service hours, narrow it down to 1 trip per route,
4
+ and subset it to parallel routes only.
5
+
6
+ Create a df that has the selected trip per route
7
+ with all the stop info (and stop sequence) attached.
8
+ """
9
+ import datetime
10
+ import dask .dataframe as dd
11
+ import dask_geopandas as dg
12
+ import geopandas as gpd
13
+ import os
14
+ import pandas as pd
15
+
16
+ os .environ ["CALITP_BQ_MAX_BYTES" ] = str (130_000_000_000 )
17
+
18
+ import shared_utils
19
+ from bus_service_utils import utils
20
+
21
+ ANALYSIS_DATE = shared_utils .rt_dates .PMAC ["Q2_2022" ]
22
+ COMPILED_CACHED = f"{ shared_utils .rt_utils .GCS_FILE_PATH } compiled_cached_views/"
23
+
24
+
25
+ def grab_service_hours (selected_date : str ,
26
+ valid_trip_keys : list ) -> pd .DataFrame :
27
+ daily_service_hours = shared_utils .gtfs_utils .get_trips (
28
+ selected_date = selected_date ,
29
+ itp_id_list = None ,
30
+ # Keep more columns, route_id, shape_id, direction_id so the metrolink fix
31
+ # can be propagated
32
+ trip_cols = ["calitp_itp_id" , "service_date" , "trip_key" ,
33
+ "route_id" , "shape_id" , "direction_id" , "service_hours" ],
34
+ get_df = True ,
35
+ custom_filtering = {"trip_key" : valid_trip_keys }
36
+ )
37
+
38
+ daily_service_hours .to_parquet (
39
+ f"{ utils .GCS_FILE_PATH } service_hours_{ selected_date } .parquet" )
40
+
41
+
42
+ def merge_trips_with_service_hours (selected_date : str )-> pd .DataFrame :
43
+
44
+ trips = dd .read_parquet (
45
+ f"{ COMPILED_CACHED } trips_{ selected_date } .parquet" )
46
+
47
+ daily_service_hours = pd .read_parquet (
48
+ f"{ utils .GCS_FILE_PATH } service_hours_{ selected_date } .parquet" )
49
+
50
+ df = dd .merge (
51
+ trips ,
52
+ daily_service_hours [["trip_key" , "service_hours" ]],
53
+ on = "trip_key" ,
54
+ how = "inner" ,
55
+ ).compute ()
56
+
57
+ return df
58
+
59
+ def select_one_trip (df : pd .DataFrame ) -> pd .DataFrame :
60
+
61
+ # Across trip_ids, for the same route_id, there are differing max_stop_sequence
62
+ # Can't use max_stop_sequence to find longest route_length
63
+ # Use service hours instead to find faster trip during free-flowing traffic
64
+ group_cols = ["calitp_itp_id" , "route_id" ]
65
+
66
+ # Select a trip that closest to 25th percentile (lower means faster!)
67
+ # This groupby ruins the index, throws an error, so just merge in as separate df
68
+ quantile = (df .groupby (group_cols )["service_hours" ]
69
+ .quantile (0.25 ).reset_index ()
70
+ .rename (columns = {"service_hours" : "p25" })
71
+ )
72
+
73
+ df = pd .merge (df , quantile ,
74
+ on = group_cols ,
75
+ how = "inner" ,
76
+ validate = "m:1"
77
+ )
78
+
79
+ # Select trip that is closest to 25th percentile (min_diff)
80
+ df ["difference" ] = abs (df .service_hours - df .p25 )
81
+ df ["min_diff" ] = df .groupby (group_cols )["difference" ].transform ("min" )
82
+
83
+ df ['faster_trip' ] = df .apply (
84
+ lambda x : 1 if x .difference == x .min_diff else 0 , axis = 1 )
85
+
86
+ # If there are multiple trips selected for a route, do a sort/drop duplicates
87
+ df2 = (df [df .faster_trip == 1 ]
88
+ .sort_values (group_cols + ["trip_id" ], ascending = [True , True , True ])
89
+ .drop_duplicates (subset = group_cols )
90
+ .drop (columns = ["faster_trip" , "difference" , "min_diff" , "p25" ])
91
+ .reset_index (drop = True )
92
+ )
93
+
94
+ return df2
95
+
96
+
97
+ def grab_stops_for_trip_selected (trip_df : dd .DataFrame ,
98
+ selected_date : str ) -> gpd .GeoDataFrame :
99
+
100
+ stop_times = dd .read_parquet (
101
+ f"{ COMPILED_CACHED } st_{ selected_date } .parquet" )
102
+
103
+ stop_times_for_trip = dd .merge (
104
+ trip_df ,
105
+ stop_times [["trip_key" , "stop_id" , "stop_sequence" ]],
106
+ on = "trip_key" ,
107
+ how = "inner"
108
+ )
109
+
110
+ # Add in stop geom
111
+ stops = dg .read_parquet (
112
+ f"{ COMPILED_CACHED } stops_{ selected_date } .parquet"
113
+ )
114
+
115
+ stop_times_with_geom = dd .merge (
116
+ stops [["calitp_itp_id" , "stop_id" , "stop_name" , "geometry" ]].drop_duplicates (),
117
+ stop_times_for_trip ,
118
+ on = ["calitp_itp_id" , "stop_id" ],
119
+ how = "inner"
120
+ ).to_crs (shared_utils .geography_utils .WGS84 )
121
+
122
+
123
+ stop_times_with_geom2 = (stop_times_with_geom .drop (
124
+ columns = ["calitp_extracted_at" , "calitp_deleted_at" ])
125
+ .sort_values (["calitp_itp_id" , "route_id" , "trip_id" , "stop_sequence" ])
126
+ .reset_index (drop = True )
127
+ ).compute ()
128
+
129
+ return stop_times_with_geom2
130
+
131
+
132
+ if __name__ == "__main__" :
133
+
134
+ trips = dd .read_parquet (
135
+ f"{ COMPILED_CACHED } trips_{ ANALYSIS_DATE } .parquet" )
136
+
137
+ valid_trip_keys = trips .trip_key .unique ().compute ().tolist ()
138
+ #grab_service_hours(ANALYSIS_DATE, valid_trip_keys)
139
+
140
+ df = merge_trips_with_service_hours (ANALYSIS_DATE )
141
+
142
+ one_trip = select_one_trip (df )
143
+
144
+ trips_with_stops = grab_stops_for_trip_selected (one_trip , ANALYSIS_DATE )
145
+
146
+ shared_utils .utils .geoparquet_gcs_export (
147
+ trips_with_stops ,
148
+ utils .GCS_FILE_PATH ,
149
+ f"trips_with_stops_{ ANALYSIS_DATE } "
150
+ )
0 commit comments