Skip to content

Commit c01b104

Browse files
authored
Merge pull request #1455 from cal-itp/vp-path-speeds
vp modeled path to speeds
2 parents 54d4441 + 973d44a commit c01b104

File tree

2 files changed

+308
-0
lines changed

2 files changed

+308
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
import datetime
2+
import numpy as np
3+
import pandas as pd
4+
import geopandas as gpd
5+
6+
from segment_speed_utils import helpers, vp_transform
7+
from segment_speed_utils.project_vars import SEGMENT_GCS, GTFS_DATA_DICT, PROJECT_CRS
8+
9+
from shared_utils import rt_dates
10+
from shared_utils.rt_utils import MPH_PER_MPS
11+
from project_condense_resample import project_point_onto_shape
12+
13+
analysis_date = rt_dates.DATES["oct2024"]
14+
15+
def grab_arrays_by_trip(df, meters_interval: int):
16+
17+
intervaled_cutoffs = []
18+
speed_series = []
19+
20+
for row in df.itertuples():
21+
22+
one_trip_distance_arr = getattr(row, "interpolated_distances")
23+
one_trip_timestamp_arr = getattr(row, "resampled_timestamps")
24+
25+
start_dist = int(np.floor(one_trip_distance_arr).min())
26+
end_dist = int(np.ceil(one_trip_distance_arr).max())
27+
28+
intervaled_distance_cutoffs = np.array(range(start_dist, end_dist, meters_interval))
29+
30+
speeds_for_trip = get_speeds_every_interval(
31+
one_trip_distance_arr,
32+
one_trip_timestamp_arr,
33+
intervaled_distance_cutoffs
34+
)
35+
36+
intervaled_cutoffs.append(intervaled_distance_cutoffs)
37+
speed_series.append(speeds_for_trip)
38+
39+
df2 = df.assign(
40+
intervaled_meters = intervaled_cutoffs,
41+
speeds = speed_series
42+
)[["trip_instance_key", "intervaled_meters", "speeds"]]
43+
44+
return df2
45+
46+
47+
def get_speeds_every_interval(
48+
one_trip_distance_arr,
49+
one_trip_timestamp_arr,
50+
intervaled_distance_cutoffs,
51+
):
52+
53+
one_trip_speed_series = []
54+
55+
for i in range(0, len(intervaled_distance_cutoffs) - 1):
56+
cut1 = intervaled_distance_cutoffs[i]
57+
cut2 = intervaled_distance_cutoffs[i+1]
58+
subset_indices = np.where((one_trip_distance_arr >= cut1) & (one_trip_distance_arr <= cut2))
59+
60+
subset_distances = one_trip_distance_arr[subset_indices]
61+
subset_times = one_trip_timestamp_arr[subset_indices]
62+
63+
# should deltas be returned?
64+
if len(subset_distances > 0):
65+
one_speed = (
66+
(subset_distances.max() - subset_distances.min()) /
67+
(subset_times.max() - subset_times.min())
68+
* MPH_PER_MPS
69+
)
70+
71+
one_trip_speed_series.append(one_speed)
72+
else:
73+
one_trip_speed_series.append(np.nan)
74+
return one_trip_speed_series
75+
76+
77+
def grab_arrays_by_trip2(
78+
df,
79+
distance_type = "",
80+
intervaled_distance_column_or_meters = ""
81+
):
82+
83+
intervaled_cutoffs = []
84+
speed_series = []
85+
86+
for row in df.itertuples():
87+
88+
one_trip_distance_arr = getattr(row, "interpolated_distances")
89+
one_trip_timestamp_arr = getattr(row, "resampled_timestamps")
90+
should_calculate = np.array(getattr(row, "stop_meters_increasing"))
91+
92+
93+
start_dist = int(np.floor(one_trip_distance_arr).min())
94+
end_dist = int(np.ceil(one_trip_distance_arr).max())
95+
96+
if distance_type == "equal_intervals":
97+
intervaled_distance_cutoffs = np.array(
98+
range(start_dist, end_dist, intervaled_distance_column_or_meters))
99+
100+
elif distance_type == "stop_to_stop":
101+
intervaled_distance_cutoffs = getattr(row, intervaled_distance_column_or_meters)
102+
#do_not_calculate_indices = np.where(should_calculate == False)[0]
103+
104+
speeds_for_trip = get_speeds_every_interval(
105+
one_trip_distance_arr,
106+
one_trip_timestamp_arr,
107+
intervaled_distance_cutoffs,
108+
)
109+
110+
#if len(do_not_calculate_indices) > 0:
111+
# speeds_for_trip[do_not_calculate_indices] = np.nan
112+
113+
114+
if distance_type == "equal_intervals":
115+
intervaled_cutoffs.append(intervaled_distance_cutoffs)
116+
keep_cols = ["intervaled_meters", "speeds"]
117+
elif distance_type == "stop_to_stop":
118+
keep_cols = ["speeds", "stop_sequence"]
119+
120+
speed_series.append(speeds_for_trip)
121+
122+
if distance_type == "equal_intervals":
123+
df2 = df.assign(
124+
intervaled_meters = intervaled_cutoffs,
125+
speeds = speed_series
126+
)
127+
128+
elif distance_type == "stop_to_stop":
129+
df2 = df.assign(
130+
speeds = speed_series
131+
)
132+
133+
return df2[["trip_instance_key"] + keep_cols]
134+
135+
136+
137+
138+
if __name__ == "__main__":
139+
'''
140+
for b in ["batch0", "batch1"]:
141+
start = datetime.datetime.now()
142+
143+
meters_interval = 250
144+
df = pd.read_parquet(
145+
f"{SEGMENT_GCS}vp_condensed/vp_resampled_{b}_{analysis_date}.parquet",
146+
)
147+
148+
results = grab_arrays_by_trip(df, meters_interval)
149+
results.to_parquet(
150+
f"{SEGMENT_GCS}rough_speeds_{meters_interval}m_{b}_{analysis_date}.parquet"
151+
)
152+
153+
end = datetime.datetime.now()
154+
print(f"{b} speeds every {meters_interval}m: {end - start}")
155+
156+
157+
#batch0 speeds every 100m: 0:03:00.469936
158+
#batch1 speeds every 100m: 0:02:50.197037
159+
#batch0 speeds every 250m: 0:01:32.080767
160+
#batch1 speeds every 250m: 0:01:38.365538
161+
#batch0 speeds every stop: 0:01:05.459700
162+
#batch1 speeds every stop: 0:00:46.450538
163+
'''
164+
165+
for b in ["batch0", "batch1"]:
166+
start = datetime.datetime.now()
167+
168+
df = pd.read_parquet(
169+
f"{SEGMENT_GCS}vp_condensed/vp_resampled_{b}_{analysis_date}.parquet",
170+
)
171+
172+
subset_trips = df.trip_instance_key.unique().tolist()
173+
174+
stop_time_cutoffs = pd.read_parquet(
175+
f"{SEGMENT_GCS}stop_times_projected_{analysis_date}.parquet",
176+
filters = [[("trip_instance_key", "in", subset_trips)]],
177+
columns = ["trip_instance_key", "stop_sequence", "stop_meters", "stop_meters_increasing"]
178+
)
179+
180+
gdf = pd.merge(
181+
df,
182+
stop_time_cutoffs,
183+
on = "trip_instance_key",
184+
how = "inner"
185+
)
186+
187+
results = grab_arrays_by_trip2(
188+
gdf,
189+
distance_type = "stop_to_stop",
190+
intervaled_distance_column_or_meters = "stop_meters",
191+
)
192+
193+
results.to_parquet(
194+
f"{SEGMENT_GCS}rough_speeds_stop_to_stop_{b}_{analysis_date}.parquet"
195+
)
196+
197+
end = datetime.datetime.now()
198+
print(f"{b} speeds every stop: {end - start}")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import dask.dataframe as dd
2+
import dask_geopandas as dg
3+
import datetime
4+
import numpy as np
5+
import pandas as pd
6+
import geopandas as gpd
7+
8+
from segment_speed_utils import helpers, vp_transform
9+
from segment_speed_utils.project_vars import SEGMENT_GCS, GTFS_DATA_DICT, PROJECT_CRS
10+
11+
from shared_utils import rt_dates
12+
from resample import project_point_onto_shape
13+
14+
analysis_date = rt_dates.DATES["oct2024"]
15+
16+
17+
def is_monotonically_increasing(my_array: list):
18+
"""
19+
Somehow store whether projecting stop position onto vp path is increasing or not.
20+
results that are True, we can calculate speeds for, otherwise, we shouldn't.
21+
these results are better than stop_meters calculated off of shape.
22+
"""
23+
my_array2 = np.array(my_array)
24+
boolean_results = np.diff(my_array2) > 0
25+
# add first observation, which is true because the first distance is compared to 0?
26+
return np.array(boolean_results) #np.array([True] + boolean_results)
27+
28+
29+
def merge_stop_times_and_vp(analysis_date: str):
30+
stop_times = helpers.import_scheduled_stop_times(
31+
analysis_date,
32+
columns = ["trip_instance_key", "stop_sequence", "geometry"],
33+
with_direction = True,
34+
get_pandas = False,
35+
)
36+
37+
vp_path =gpd.read_parquet(
38+
f"{SEGMENT_GCS}vp_condensed/vp_projected_{analysis_date}.parquet",
39+
columns = ["trip_instance_key", "vp_geometry"]
40+
).drop_duplicates()
41+
42+
stop_times_vp_geom = dd.merge(
43+
stop_times,
44+
vp_path,
45+
on = "trip_instance_key",
46+
how = "inner"
47+
).sort_values(["trip_instance_key", "stop_sequence"]).reset_index(drop=True)
48+
49+
stop_times_vp_geom = stop_times_vp_geom.repartition(npartitions=20)
50+
51+
return stop_times_vp_geom
52+
53+
def project_stop_onto_vp_geom_and_condense(gddf):
54+
55+
orig_dtypes = gddf.dtypes.to_dict()
56+
57+
gdf2 = gddf.map_partitions(
58+
project_point_onto_shape,
59+
line_geom = "vp_geometry",
60+
point_geom = "geometry",
61+
meta = {
62+
**orig_dtypes,
63+
"projected_meters": "float"
64+
},
65+
align_dataframes = True
66+
)
67+
68+
gdf2 = gdf2.rename(
69+
columns = {"projected_meters": "stop_meters"}
70+
).drop(
71+
columns = ["vp_geometry", "geometry"]
72+
).persist()
73+
74+
75+
return gdf2
76+
77+
78+
79+
80+
if __name__ == "__main__":
81+
82+
83+
start = datetime.datetime.now()
84+
85+
stop_times_vp_geom = merge_stop_times_and_vp(analysis_date)
86+
87+
gdf = project_stop_onto_vp_geom_and_condense(stop_times_vp_geom).compute()
88+
89+
results = (gdf
90+
.groupby("trip_instance_key", group_keys=False)
91+
.agg({
92+
"stop_sequence": lambda x: list(x),
93+
"stop_meters": lambda x: list(x)
94+
})
95+
.reset_index()
96+
)
97+
98+
results = results.assign(
99+
stop_meters_increasing = results.apply(
100+
lambda x: is_monotonically_increasing(x.stop_meters), axis=1
101+
)
102+
)
103+
104+
results.to_parquet(
105+
f"{SEGMENT_GCS}stop_times_projected_{analysis_date}.parquet"
106+
)
107+
108+
109+
end = datetime.datetime.now()
110+
print(f"stop times prep: {end - start}")

0 commit comments

Comments
 (0)