Skip to content

Commit aaba50c

Browse files
authored
Merge pull request #1363 from cal-itp/vp-hackathon
Create dataset package as zipped folder for hackathon
2 parents 3eb825f + db5aab5 commit aaba50c

File tree

2 files changed

+267
-3
lines changed

2 files changed

+267
-3
lines changed
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
"""
2+
Grab a set of Cal-ITP tables for Posit hackathon in Feb 2025.
3+
4+
Grab GTFS schedule and vp tables from GCS bucket and
5+
subset to the operators. Concatenate across dates.
6+
7+
Our GTFS analytics data catalog:
8+
https://github.com/cal-itp/data-analyses/blob/main/_shared_utils/shared_utils/gtfs_analytics_data.yml
9+
"""
10+
import geopandas as gpd
11+
import glob
12+
import pandas as pd
13+
import os
14+
import shutil
15+
16+
from segment_speed_utils import time_series_utils
17+
from update_vars import GTFS_DATA_DICT
18+
from shared_utils import rt_dates
19+
20+
OUTPUT_FOLDER = "./hackathon/"
21+
CRS = "EPSG:4326"
22+
23+
def merge_crosswalk(
24+
df: pd.DataFrame,
25+
crosswalk: pd.DataFrame
26+
) -> pd.DataFrame:
27+
"""
28+
Drop feed_key after merging in schedule_gtfs_dataset_key.
29+
Use this identifier when merging in vp.
30+
"""
31+
df2 = pd.merge(
32+
df,
33+
crosswalk,
34+
on = "feed_key",
35+
how = "inner"
36+
).drop(columns = "feed_key")
37+
38+
return df2
39+
40+
41+
def export_gtfs_schedule_tables(
42+
operator_list: list,
43+
date_list: list
44+
):
45+
"""
46+
Export subset of downloaded schedule tables (minimally processed in our warehouse),
47+
by inputting a list of operator names.
48+
"""
49+
schedule_tables = GTFS_DATA_DICT.schedule_downloads
50+
51+
trips = time_series_utils.concatenate_datasets_across_dates(
52+
gcs_bucket = schedule_tables.dir,
53+
dataset_name = schedule_tables.trips,
54+
date_list = date_list,
55+
data_type = "df",
56+
get_pandas = True,
57+
filters = [[("name", "in", operator_list)]],
58+
columns = ["feed_key", "gtfs_dataset_key", "name",
59+
"service_date", "trip_id", "trip_instance_key",
60+
"route_key", "route_id", "route_type",
61+
"route_short_name", "route_long_name", "route_desc",
62+
"direction_id",
63+
"shape_array_key", "shape_id",
64+
"trip_first_departure_datetime_pacific",
65+
"trip_last_arrival_datetime_pacific",
66+
"service_hours"
67+
]
68+
).rename(
69+
columns = {
70+
"gtfs_dataset_key": "schedule_gtfs_dataset_key",
71+
})
72+
73+
subset_feeds = trips.feed_key.unique().tolist()
74+
crosswalk = trips[["feed_key", "schedule_gtfs_dataset_key"]].drop_duplicates()
75+
76+
shapes = time_series_utils.concatenate_datasets_across_dates(
77+
gcs_bucket = schedule_tables.dir,
78+
dataset_name = schedule_tables.shapes,
79+
date_list = date_list,
80+
data_type = "gdf",
81+
get_pandas = True,
82+
filters = [[("feed_key", "in", subset_feeds)]],
83+
columns = ["feed_key",
84+
"shape_array_key", "shape_id",
85+
"n_trips", "geometry"
86+
]
87+
).pipe(
88+
merge_crosswalk, crosswalk
89+
).to_crs(CRS)
90+
91+
stops = time_series_utils.concatenate_datasets_across_dates(
92+
gcs_bucket = schedule_tables.dir,
93+
dataset_name = schedule_tables.stops,
94+
date_list = date_list,
95+
data_type = "gdf",
96+
get_pandas = True,
97+
filters = [[("feed_key", "in", subset_feeds)]],
98+
columns = [
99+
"feed_key", "service_date",
100+
"stop_id", "stop_key", "stop_name",
101+
"geometry"
102+
]
103+
).pipe(
104+
merge_crosswalk, crosswalk
105+
).to_crs(CRS)
106+
107+
stop_times = time_series_utils.concatenate_datasets_across_dates(
108+
gcs_bucket = schedule_tables.dir,
109+
dataset_name = schedule_tables.stop_times,
110+
date_list = date_list,
111+
data_type = "df",
112+
get_pandas = True,
113+
filters = [[("feed_key", "in", subset_feeds)]],
114+
columns = ["feed_key",
115+
"trip_id", "stop_id",
116+
"stop_sequence", "arrival_sec",
117+
"timepoint"
118+
]
119+
).pipe(merge_crosswalk, crosswalk)
120+
121+
trips.to_parquet(f"{OUTPUT_FOLDER}trips.parquet")
122+
shapes.to_parquet(f"{OUTPUT_FOLDER}shapes.parquet")
123+
stops.to_parquet(f"{OUTPUT_FOLDER}stops.parquet")
124+
stop_times.to_parquet(f"{OUTPUT_FOLDER}stop_times.parquet")
125+
126+
return
127+
128+
129+
def export_gtfs_vp_table(
130+
schedule_gtfs_dataset_key_list: list,
131+
date_list: list
132+
):
133+
"""
134+
Export subset of downloaded vp (deduped in our warehouse),
135+
by inputting a list of schedule_gtfs_dataset_keys.
136+
"""
137+
vp_tables = GTFS_DATA_DICT.speeds_tables
138+
139+
vp = time_series_utils.concatenate_datasets_across_dates(
140+
gcs_bucket = vp_tables.dir,
141+
dataset_name = vp_tables.raw_vp,
142+
date_list = analysis_date_list,
143+
data_type = "gdf",
144+
get_pandas = True,
145+
filters = [[("schedule_gtfs_dataset_key", "in", schedule_gtfs_dataset_key_list)]],
146+
columns = [
147+
"gtfs_dataset_name", "gtfs_dataset_key", "schedule_gtfs_dataset_key",
148+
"trip_id", "trip_instance_key",
149+
"location_timestamp_local",
150+
"geometry"]
151+
).rename(
152+
columns = {"gtfs_dataset_key": "vp_gtfs_dataset_key"}
153+
).to_crs(CRS)
154+
155+
vp.to_parquet(f"{OUTPUT_FOLDER}vp.parquet")
156+
157+
return
158+
159+
160+
def export_segments_table(
161+
schedule_gtfs_dataset_key_list: list,
162+
date_list: list
163+
):
164+
"""
165+
Export subset of cut segments from gtfs_segments.create_segments,
166+
by inputting a list of schedule_gtfs_dataset_keys.
167+
"""
168+
vp_tables = GTFS_DATA_DICT.rt_stop_times
169+
170+
segments = time_series_utils.concatenate_datasets_across_dates(
171+
gcs_bucket = vp_tables.dir,
172+
dataset_name = vp_tables.segments_file,
173+
date_list = analysis_date_list,
174+
data_type = "gdf",
175+
get_pandas = True,
176+
filters = [[("schedule_gtfs_dataset_key", "in", schedule_gtfs_dataset_key_list)]],
177+
columns = ["trip_instance_key",
178+
"schedule_gtfs_dataset_key", "route_id", "direction_id",
179+
"stop_pair", "stop_id1", "stop_id2",
180+
"stop_sequence", "geometry"
181+
]
182+
).to_crs(CRS)
183+
184+
segments.to_parquet(f"{OUTPUT_FOLDER}segments.parquet")
185+
186+
return
187+
188+
189+
def export_roads_for_district(district: int = 7):
190+
"""
191+
Export subset roads in District 7, which is where
192+
all the operators are.
193+
"""
194+
SHARED_GCS = GTFS_DATA_DICT.gcs_paths.SHARED_GCS
195+
196+
roads = gpd.read_parquet(
197+
f"{SHARED_GCS}all_roads_2020_state06.parquet"
198+
).to_crs(CRS)
199+
200+
caltrans_districts = gpd.read_parquet(
201+
f"{SHARED_GCS}caltrans_districts.parquet",
202+
filters = [[("DISTRICT", "==", district)]]
203+
).to_crs(CRS)
204+
205+
roads_in_district = gpd.sjoin(
206+
roads,
207+
caltrans_districts,
208+
how = "inner",
209+
predicate = "within"
210+
).drop(columns = ["DISTRICT", "index_right"])
211+
212+
roads_in_district.to_parquet(f"{OUTPUT_FOLDER}district7_roads.parquet")
213+
214+
return
215+
216+
217+
218+
if __name__ == "__main__":
219+
220+
analysis_date_list = [
221+
rt_dates.DATES[f"{m}2024"] for m in ["oct", "nov"]
222+
]
223+
224+
operators = [
225+
"Big Blue Bus Schedule",
226+
"Culver City Schedule",
227+
"BruinBus Schedule",
228+
"Santa Clarita Schedule",
229+
"LA DOT Schedule",
230+
]
231+
232+
# Schedule tables - trips, shapes, stops, stop_times
233+
export_gtfs_schedule_tables(
234+
operators, analysis_date_list
235+
)
236+
237+
schedule_keys = pd.read_parquet(
238+
f"{OUTPUT_FOLDER}trips.parquet"
239+
).schedule_gtfs_dataset_key.unique().tolist()
240+
241+
# vehicle positions table
242+
export_gtfs_vp_table(
243+
schedule_keys, analysis_date_list
244+
)
245+
246+
# segments for mapping
247+
export_segments_table(
248+
schedule_keys, analysis_date_list
249+
)
250+
251+
# roads for D7, which is where these operators are
252+
export_roads_for_district(district = 7)
253+
254+
# Zip up folder of parquets within OUTPUT_FOLDER -- download this and send
255+
# do not check into GitHub
256+
shutil.make_archive("gtfs_assembled_hackathon", "zip", OUTPUT_FOLDER)
257+
258+
# Delete local parquets, just keep zipped folder to download and send
259+
for f in glob.glob(f"{OUTPUT_FOLDER}*.parquet"):
260+
os.remove(f)
261+
262+

rt_segment_speeds/segment_speed_utils/time_series_utils.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import gcsfs
1919
fs = gcsfs.GCSFileSystem()
2020

21+
'''
2122
def concatenate_datasets_across_dates(
2223
gcs_bucket: str,
2324
dataset_name: str,
@@ -56,8 +57,8 @@ def concatenate_datasets_across_dates(
5657
df = compute(df)[0]
5758
5859
return df
60+
'''
5961

60-
"""
6162
def concatenate_datasets_across_dates(
6263
gcs_bucket: str,
6364
dataset_name: str,
@@ -75,11 +76,12 @@ def concatenate_datasets_across_dates(
7576
date_list,
7677
data_type = data_type,
7778
get_pandas=get_pandas,
78-
add_date = True
79+
add_date = True,
80+
**kwargs
7981
)
8082

8183
return df
82-
"""
84+
8385

8486
def clean_standardized_route_names(
8587
df: pd.DataFrame,

0 commit comments

Comments
 (0)