Skip to content

Commit 6bcc337

Browse files
committed
conveyal update gets old feeds if current feed only has future service defined
1 parent b54b6e6 commit 6bcc337

File tree

4 files changed

+120
-19
lines changed

4 files changed

+120
-19
lines changed

conveyal_update/conveyal_vars.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
GCS_PATH = 'gs://calitp-analytics-data/data-analyses/conveyal_update/'
55
TARGET_DATE = rt_dates.DATES['mar2025']
6+
LOOKBACK_TIME = dt.timedelta(days=60)
67
OSM_FILE = 'us-west-latest.osm.pbf'
8+
PUBLISHED_FEEDS_YML_PATH = "../gtfs_funnel/published_operators.yml"
79
# http://download.geofabrik.de/north-america/us-west-latest.osm.pbf
810
# first download with wget...
911

conveyal_update/download_data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
regions = conveyal_vars.conveyal_regions
1515
TARGET_DATE = conveyal_vars.TARGET_DATE
1616

17-
regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')
1817

1918
def download_feed(row):
2019
# need wildcard for file too -- not all are gtfs.zip!
@@ -29,7 +28,8 @@ def download_region(feeds_df, region: str):
2928

3029
assert region in regions.keys()
3130
path = f'./feeds_{feeds_df.date.iloc[0].strftime("%Y-%m-%d")}/{region}'
32-
if not os.path.exists(path): os.makedirs(path)
31+
if not os.path.exists(path):
32+
os.makedirs(path)
3333
region = (feeds_df >> filter(_.region == region)).copy()
3434
region['path'] = path
3535
region.progress_apply(download_feed, axis = 1)
@@ -46,6 +46,7 @@ def generate_script(regions):
4646
f.write('\n'.join(cmds))
4747

4848
if __name__ == '__main__':
49+
regions_and_feeds = pd.read_parquet(f'{conveyal_vars.GCS_PATH}regions_feeds_{TARGET_DATE}.parquet')
4950

5051
for region in tqdm(regions.keys()):
5152
download_region(regions_and_feeds, region)

conveyal_update/evaluate_feeds.py

Lines changed: 103 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
from shared_utils import gtfs_utils_v2
44

55
from calitp_data_analysis.tables import tbls
6+
from calitp_data_analysis.sql import query_sql
67
from siuba import *
78
import pandas as pd
89
import datetime as dt
910

1011
import conveyal_vars
1112

13+
TARGET_DATE = conveyal_vars.TARGET_DATE
14+
REGIONAL_SUBFEED_NAME = "Regional Subfeed"
15+
1216
def check_defined_elsewhere(row, df):
1317
'''
1418
for feeds without service defined, check if the same service is captured in another feed that does include service
@@ -17,11 +21,6 @@ def check_defined_elsewhere(row, df):
1721
row['service_any_feed'] = is_defined
1822
return row
1923

20-
21-
22-
TARGET_DATE = conveyal_vars.TARGET_DATE
23-
REGIONAL_SUBFEED_NAME = "Regional Subfeed"
24-
2524
def get_feeds_check_service():
2625
feeds_on_target = gtfs_utils_v2.schedule_daily_feed_to_gtfs_dataset_name(selected_date=TARGET_DATE)
2726
feeds_on_target = feeds_on_target.rename(columns={'name':'gtfs_dataset_name'})
@@ -62,25 +61,115 @@ def attach_transit_services(feeds_on_target: pd.DataFrame):
6261
].copy()
6362
return feeds_services_filtered
6463

65-
def report_undefined(feeds_on_target: pd.DataFrame):
66-
fname = 'no_apparent_service.csv'
64+
def get_undefined_feeds(feeds_on_target: pd.DataFrame) -> pd.DataFrame:
6765
undefined = feeds_on_target.apply(check_defined_elsewhere, axis=1, args=[feeds_on_target]) >> filter(-_.service_any_feed)
66+
return undefined
67+
68+
INT_TO_GTFS_WEEKDAY = {
69+
0: "monday",
70+
1: "tuesday",
71+
2: "wednesday",
72+
3: "thursday",
73+
4: "friday",
74+
5: "saturday",
75+
6: "sunday"
76+
}
77+
78+
def report_unavailable_feeds(feeds, fname):
79+
undefined = feeds.loc[
80+
feeds["valid_date_other_than_service_date"] | ~feeds["usable_schedule_feed_exists"]
81+
].copy()
6882
if undefined.empty:
6983
print('no undefined service feeds')
7084
else:
71-
print(undefined.columns)
7285
print('these feeds have no service defined on target date, nor are their services captured in other feeds:')
73-
# gtfs_dataset_name no longer present, this whole script should probably be updated/replaced
74-
print(undefined >> select(_.gtfs_dataset_name, _.service_any_feed))
86+
print(undefined.loc[~undefined["usable_schedule_feed_exists"], "gtfs_dataset_name"].drop_duplicates())
87+
print('these feeds have defined service, but only in a feed defined on a prior day')
88+
print(undefined.loc[undefined["valid_date_other_than_service_date"], "gtfs_dataset_name"].drop_duplicates())
7589
print(f'saving detailed csv to {fname}')
76-
undefined.to_csv(fname)
77-
return
90+
undefined.to_csv(fname, index=False)
91+
92+
ISO_DATE_ONLY_FORMAT = "%y-%m-%d"
93+
94+
def get_old_feeds(undefined_feeds_base64_urls: pd.Series, target_date: dt.date | dt.datetime, max_lookback_timedelta: dt.timedelta) -> pd.Series:
95+
96+
base_64_urls_str = "('" + "', '".join(undefined_feeds_base64_urls) + "')"
97+
day_of_the_week = INT_TO_GTFS_WEEKDAY[target_date.weekday()]
98+
max_lookback_date = target_date - max_lookback_timedelta
99+
target_date_iso = target_date.strftime(ISO_DATE_ONLY_FORMAT)
100+
101+
query = f"""
102+
SELECT
103+
`mart_gtfs.dim_schedule_feeds`.base64_url AS base64_url,
104+
`mart_gtfs.dim_schedule_feeds`.key as feed_key,
105+
`mart_gtfs.dim_calendar`.{day_of_the_week} AS target_day_of_the_week,
106+
MAX(`mart_gtfs.dim_schedule_feeds`._valid_to) AS valid_feed_date,
107+
from `mart_gtfs.dim_schedule_feeds`
108+
LEFT JOIN `mart_gtfs.dim_calendar`
109+
ON `mart_gtfs.dim_schedule_feeds`.key = `mart_gtfs.dim_calendar`.feed_key
110+
WHERE `mart_gtfs.dim_schedule_feeds`.base64_url IN {base_64_urls_str}
111+
AND `mart_gtfs.dim_schedule_feeds`._valid_to <= '{target_date}'
112+
AND `mart_gtfs.dim_schedule_feeds`._valid_to >= '{max_lookback_date}'
113+
AND `mart_gtfs.dim_calendar`.start_date <= '{target_date}'
114+
AND `mart_gtfs.dim_calendar`.end_date >= '{target_date}'
115+
GROUP BY
116+
`mart_gtfs.dim_schedule_feeds`.base64_url,
117+
`mart_gtfs.dim_schedule_feeds`.key,
118+
`mart_gtfs.dim_calendar`.{day_of_the_week}
119+
ORDER BY target_day_of_the_week DESC
120+
LIMIT 1000
121+
"""
122+
response = query_sql(
123+
query
124+
)
125+
response_grouped = response.groupby("base64_url")
126+
feed_info_by_url = response_grouped[["valid_feed_date", "feed_key"]].first()
127+
print(feed_info_by_url)
128+
feed_info_by_url["valid_feed_date"] = feed_info_by_url["valid_feed_date"].dt.date - dt.timedelta(days=1)
129+
# we have the day the feed becomes invalid, so the day we are interested in where the feed *is* valid is the day after
130+
feed_info_by_url["no_operations_on_target_day_of_the_week"] = ~(response_grouped["target_day_of_the_week"].any())
131+
return feed_info_by_url
132+
133+
def merge_old_feeds(df_all_feeds: pd.DataFrame, df_undefined_feeds: pd.DataFrame, target_date: dt.date, max_lookback_timedelta: dt.timedelta) -> pd.DataFrame:
134+
feed_search_result = get_old_feeds(
135+
df_undefined_feeds["base64_url"],
136+
target_date,
137+
max_lookback_timedelta
138+
)
139+
feeds_merged = df_all_feeds.merge(
140+
feed_search_result,
141+
how="left",
142+
left_on="base64_url",
143+
right_index=True,
144+
validate="many_to_one"
145+
)
146+
feeds_merged["feed_key"] = feeds_merged["feed_key_y"].fillna(feeds_merged["feed_key_x"])
147+
feeds_merged["no_schedule_feed_found"] = (
148+
(feeds_merged["base64_url"].isin(df_undefined_feeds["base64_url"])) & (~feeds_merged["base64_url"].isin(feed_search_result.index))
149+
)
150+
feeds_merged["no_operations_on_target_date_but_valid_feed_exists"] = (feeds_merged["no_operations_on_target_day_of_the_week"].fillna(False))
151+
feeds_merged["usable_schedule_feed_exists"] = (
152+
~(feeds_merged["no_schedule_feed_found"] | feeds_merged["no_operations_on_target_date_but_valid_feed_exists"])
153+
)
154+
feeds_merged["date"] = feeds_merged.loc[
155+
~feeds_merged["no_operations_on_target_date_but_valid_feed_exists"], "valid_feed_date"
156+
]
157+
feeds_merged["date"] = feeds_merged["date"].fillna(target_date)
158+
feeds_merged["valid_date_other_than_service_date"] = feeds_merged["date"] != target_date
159+
160+
return feeds_merged.drop(
161+
["valid_feed_date", "no_operations_on_target_day_of_the_week", "feed_key_x", "feed_key_y"], axis=1
162+
)
78163

79164
if __name__ == '__main__':
80165

81166
feeds_on_target = get_feeds_check_service()
82167
feeds_on_target = attach_transit_services(feeds_on_target)
83168
print(f'feeds on target date shape: {feeds_on_target.shape}')
84-
report_undefined(feeds_on_target)
85-
feeds_on_target.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
169+
undefined_feeds = get_undefined_feeds(feeds_on_target)
170+
feeds_merged = merge_old_feeds(
171+
feeds_on_target, undefined_feeds, dt.date.fromisoformat(TARGET_DATE), conveyal_vars.LOOKBACK_TIME
172+
)
173+
report_unavailable_feeds(feeds_merged, 'no_apparent_service.csv')
174+
feeds_merged.to_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
86175

conveyal_update/match_feeds_regions.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313

1414
regions = conveyal_vars.conveyal_regions
1515
TARGET_DATE = conveyal_vars.TARGET_DATE
16-
feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
1716

1817
def create_region_gdf():
1918
# https://shapely.readthedocs.io/en/stable/reference/shapely.box.html#shapely.box
@@ -26,14 +25,24 @@ def create_region_gdf():
2625
region_gdf = gpd.GeoDataFrame(df, crs=geography_utils.WGS84).to_crs(geography_utils.CA_NAD83Albers_m)
2726
return region_gdf
2827

28+
def get_stops_dates(feeds_on_target: pd.DataFrame, feed_key_column_name: str = "feed_key", date_column_name: str = "date"):
29+
all_stops = feeds_on_target.groupby(date_column_name)[feed_key_column_name].apply(
30+
lambda feed_key_column: gtfs_utils_v2.get_stops(
31+
selected_date=feed_key_column.name,
32+
operator_feeds=feed_key_column
33+
)
34+
)
35+
return all_stops
36+
2937
def join_stops_regions(region_gdf: gpd.GeoDataFrame, feeds_on_target: pd.DataFrame):
30-
all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key).to_crs(geography_utils.CA_NAD83Albers_m)
38+
#all_stops = gtfs_utils_v2.get_stops(selected_date=TARGET_DATE, operator_feeds=feeds_on_target.feed_key)
39+
all_stops = get_stops_dates(feeds_on_target).to_crs(geography_utils.CA_NAD83Albers_m)
3140
region_join = gpd.sjoin(region_gdf, all_stops)
3241
regions_and_feeds = region_join >> distinct(_.region, _.feed_key)
3342
return regions_and_feeds
3443

3544
if __name__ == '__main__':
36-
45+
feeds_on_target = pd.read_parquet(f'{conveyal_vars.GCS_PATH}feeds_{TARGET_DATE}.parquet')
3746
region_gdf = create_region_gdf()
3847
regions_and_feeds = join_stops_regions(region_gdf, feeds_on_target)
3948
regions_and_feeds = regions_and_feeds >> inner_join(_, feeds_on_target >> select(_.feed_key, _.gtfs_dataset_name, _.base64_url,

0 commit comments

Comments
 (0)