1
1
import os
2
2
os .environ ["CALITP_BQ_MAX_BYTES" ] = str (800_000_000_000 )
3
- from shared_utils import gtfs_utils_v2
4
3
4
+ from shared_utils import gtfs_utils_v2
5
5
from calitp_data_analysis .tables import tbls
6
6
from calitp_data_analysis .sql import query_sql
7
7
from siuba import *
8
8
import pandas as pd
9
9
import datetime as dt
10
-
11
10
import conveyal_vars
12
11
13
12
TARGET_DATE = conveyal_vars .TARGET_DATE
14
13
REGIONAL_SUBFEED_NAME = "Regional Subfeed"
14
+ INT_TO_GTFS_WEEKDAY = {
15
+ 0 : "monday" ,
16
+ 1 : "tuesday" ,
17
+ 2 : "wednesday" ,
18
+ 3 : "thursday" ,
19
+ 4 : "friday" ,
20
+ 5 : "saturday" ,
21
+ 6 : "sunday"
22
+ }
15
23
16
24
def check_defined_elsewhere (row , df ):
17
25
'''
@@ -40,7 +48,7 @@ def get_feeds_check_service():
40
48
return feeds_on_target
41
49
42
50
def attach_transit_services (feeds_on_target : pd .DataFrame ):
43
-
51
+ """Associate each feed in feeds_on_target.gtfs_dataset_key with a transit service"""
44
52
target_dt = dt .datetime .combine (dt .date .fromisoformat (TARGET_DATE ), dt .time (0 ))
45
53
46
54
services = (tbls .mart_transit_database .dim_gtfs_service_data ()
@@ -62,20 +70,12 @@ def attach_transit_services(feeds_on_target: pd.DataFrame):
62
70
return feeds_services_filtered
63
71
64
72
def get_undefined_feeds (feeds_on_target : pd .DataFrame ) -> pd .DataFrame :
73
+ """Return feeds in feeds_on_target that do not have service and where service is not defined in another feed"""
65
74
undefined = feeds_on_target .apply (check_defined_elsewhere , axis = 1 , args = [feeds_on_target ]) >> filter (- _ .service_any_feed )
66
75
return undefined
67
-
68
- INT_TO_GTFS_WEEKDAY = {
69
- 0 : "monday" ,
70
- 1 : "tuesday" ,
71
- 2 : "wednesday" ,
72
- 3 : "thursday" ,
73
- 4 : "friday" ,
74
- 5 : "saturday" ,
75
- 6 : "sunday"
76
- }
77
76
78
- def report_unavailable_feeds (feeds , fname ):
77
+ def report_unavailable_feeds (feeds : pd .DataFrame , fname : str ) -> None :
78
+ """Create a csv report of unavailable or backdated feeds at the paths specified in fname"""
79
79
undefined = feeds .loc [
80
80
feeds ["valid_date_other_than_service_date" ] | feeds ["no_schedule_feed_found" ]
81
81
].copy ()
@@ -92,12 +92,31 @@ def report_unavailable_feeds(feeds, fname):
92
92
ISO_DATE_ONLY_FORMAT = "%y-%m-%d"
93
93
94
94
def get_old_feeds (undefined_feeds_base64_urls : pd .Series , target_date : dt .date | dt .datetime , max_lookback_timedelta : dt .timedelta ) -> pd .Series :
95
+ """
96
+ Search the warehouse for feeds downloaded within the time before target_date
97
+ defined by max_lookback_timedelta that have service as defined in calendar.txt
98
+ on target_date. These feeds will not be valid on target_date, but will be accepted by Conveyal.
99
+ This should not be used if the feeds are valid on the target_date, since this will provide needlessly
100
+ invalid feeds. Note that this does not check calendar_dates.txt at present
101
+
102
+ Parameters:
103
+ undefined_feeds_base64_urls: a Pandas series containing base64 urls to feeds in the warehouse
104
+ target_date: a date or datetime where the feeds should be valid based on calendar.txt
105
+ max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for
95
106
107
+ Returns:
108
+ A DataFrame with the following index and columns:
109
+ index: The base64 url of the feed, will match entries in undefined_feeds_base64_urls
110
+ feed_key: A key to dim_schedule_feeds matching the feed on the date it was last valid in the warehouse
111
+ date_processed: A datetime date matching the date on which the feed was last valid in the warehosue
112
+ """
96
113
base_64_urls_str = "('" + "', '" .join (undefined_feeds_base64_urls ) + "')"
97
114
day_of_the_week = INT_TO_GTFS_WEEKDAY [target_date .weekday ()]
98
115
max_lookback_date = target_date - max_lookback_timedelta
99
116
target_date_iso = target_date .strftime (ISO_DATE_ONLY_FORMAT )
100
-
117
+ # Query feeds for the newest feed where service is defined on the target_date,
118
+ # that have service on the day of the week of the target date, and
119
+ # that are valid before (inclusive) the target date and after (inclusive) the max look back date,
101
120
query = f"""
102
121
SELECT
103
122
`mart_gtfs.dim_schedule_feeds`.base64_url AS base64_url,
@@ -127,20 +146,35 @@ def get_old_feeds(undefined_feeds_base64_urls: pd.Series, target_date: dt.date |
127
146
return feed_info_by_url .drop ("valid_feed_date" , axis = 1 )
128
147
129
148
def merge_old_feeds (df_all_feeds : pd .DataFrame , df_undefined_feeds : pd .DataFrame , target_date : dt .date , max_lookback_timedelta : dt .timedelta ) -> pd .DataFrame :
149
+ """
150
+ Merge feeds from df_all_feeds with old feeds found as a result of calling get_old_feeds with df_undefined_feeds.base64_url
151
+
152
+ Params:
153
+ df_all_feeds: A DataFrame of feeds, must have feed_key, date, and base64_url as columns and must include the base64_urls in df_undefined_feeds
154
+ df_undefined_feeds: A DataFrame of feeds that are not valid on target_date, where an old feed should be searched for.
155
+ Must have base64_url as a column
156
+ target_date: a date or datetime where the feed should be valid based on its target date
157
+ max_lookback_timedelta: a timedelta defining the amount of time before target_date that a feed must have been available for
158
+
159
+ Returns:
160
+ A DataFrame identical to df_all_feeds except with the following columns changed or added:
161
+ feed_key: Updated for the found feeds
162
+ date: Updated for the found feeds:
163
+ no_schedule_feed_found: True if a schedule feed was present in df_undefined_feeds but was not associated with an older feed, otherwise false
164
+ valid_date_other_than_service_date: True if a new feed was found, otherwise false
165
+ """
130
166
feed_search_result = get_old_feeds (
131
167
df_undefined_feeds ["base64_url" ],
132
168
target_date ,
133
169
max_lookback_timedelta
134
170
)
135
- print (feed_search_result )
136
171
feeds_merged = df_all_feeds .merge (
137
172
feed_search_result ,
138
173
how = "left" ,
139
174
left_on = "base64_url" ,
140
175
right_index = True ,
141
176
validate = "many_to_one"
142
177
)
143
- print (list (feeds_merged .columns ))
144
178
feeds_merged ["feed_key" ] = feeds_merged ["feed_key_y" ].fillna (feeds_merged ["feed_key_x" ])
145
179
feeds_merged ["no_schedule_feed_found" ] = (
146
180
(feeds_merged ["base64_url" ].isin (df_undefined_feeds ["base64_url" ])) & (~ feeds_merged ["base64_url" ].isin (feed_search_result .index ))
0 commit comments