Skip to content

Commit 43afcce

Browse files
NTD: ingest 2022 agency and create external table, expand dynamic url scrape, clean up previous code (#3604)
* expand dynamic ntd url scrape to cover agency data for multiple years * new dag for 2022 agency scrape * add new external table for 2022 agency info, fix bugs in previous external table creation * clean up xcoms fetch
1 parent f81d512 commit 43afcce

File tree

6 files changed

+45
-187
lines changed

6 files changed

+45
-187
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
operator: operators.ExternalTable
2+
bucket: gs://calitp-ntd-xlsx-products-clean
3+
prefix_bucket: false
4+
post_hook: |
5+
SELECT *
6+
FROM `{{ get_project_id() }}`.external_ntd__annual_reporting.2022__annual_database_agency_information
7+
LIMIT 1;
8+
source_objects:
9+
- "annual_database_agency_information/2022/_2022_agency_information/*.jsonl.gz"
10+
destination_project_dataset_table: "external_ntd__annual_reporting.2022__annual_database_agency_information"
11+
source_format: NEWLINE_DELIMITED_JSON
12+
use_bq_client: true
13+
hive_options:
14+
mode: AUTO
15+
require_partition_filter: false
16+
source_uri_prefix: "annual_database_agency_information/2022/_2022_agency_information/"

airflow/dags/create_external_tables/ntd_data_products/2023__annual_database_agency_information.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
operator: operators.ExternalTable
2-
bucket: gs://test-calitp-ntd-xlsx-products-clean
2+
bucket: gs://calitp-ntd-xlsx-products-clean
33
prefix_bucket: false
44
post_hook: |
55
SELECT *

airflow/dags/create_external_tables/ntd_data_products/2023__annual_database_contractual_relationships.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
operator: operators.ExternalTable
2-
bucket: gs://test-calitp-ntd-xlsx-products-clean
2+
bucket: gs://calitp-ntd-xlsx-products-clean
33
prefix_bucket: false
44
post_hook: |
55
SELECT *
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
operator: operators.NtdDataProductXLSXOperator
2+
3+
product: 'annual_database_agency_information'
4+
xlsx_file_url: 'https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information' # placeholder for scraped url from scrape_ntd_xlsx_urls task
5+
year: '2022' # one of: 'historical' (long history), 'mutli-year' (select history), or a specific year (ex: 2022)
6+
dependencies:
7+
- scrape_ntd_xlsx_urls

airflow/dags/sync_ntd_data_xlsx/scrape_ntd_xlsx_urls.py

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010

1111
xlsx_urls = {
1212
"ridership_url": "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release",
13-
"agency_url": "https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information",
13+
"2022_agency_url": "https://www.transit.dot.gov/ntd/data-product/2022-annual-database-agency-information",
14+
"2023_agency_url": "https://www.transit.dot.gov/ntd/data-product/2023-annual-database-agency-information",
1415
"contractual_relationship_url": "https://www.transit.dot.gov/ntd/data-product/2023-annual-database-contractual-relationship",
1516
}
1617

@@ -46,36 +47,3 @@ def scrape_ntd_xlsx_urls(**context):
4647
logging.info(f"Validated URL: {validated_url}.")
4748

4849
push_url_to_xcom(key=key, scraped_url=validated_url, context=context)
49-
50-
51-
# # pushes the scraped URL value to XCom
52-
# def push_url_to_xcom(scraped_url, context):
53-
# task_instance = context["ti"]
54-
# task_instance.xcom_push(key="current_url", value=scraped_url)
55-
56-
57-
# # Look for an anchor tag where the href ends with '.xlsx' and starts with '/sites/fta.dot.gov/files/'
58-
# def href_matcher(href):
59-
# return (
60-
# href and href.startswith("/sites/fta.dot.gov/files/") and href.endswith(".xlsx")
61-
# )
62-
63-
64-
# def scrape_ntd_xlsx_urls(**context):
65-
# # page to find download URL
66-
# url = "https://www.transit.dot.gov/ntd/data-product/monthly-module-raw-data-release"
67-
# req = requests.get(url)
68-
# soup = BeautifulSoup(req.text, "html.parser")
69-
70-
# link = soup.find("a", href=href_matcher)
71-
72-
# # Extract the href if the link is found
73-
# file_link = link["href"] if link else None
74-
75-
# updated_url = f"https://www.transit.dot.gov{file_link}"
76-
77-
# validated_url = parse_obj_as(HttpUrl, updated_url)
78-
79-
# logging.info(f"Validated URL: {validated_url}.")
80-
81-
# push_url_to_xcom(scraped_url=validated_url, context=context)

airflow/plugins/operators/scrape_ntd_xlsx.py

Lines changed: 18 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,25 @@
1919
RAW_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__RAW"]
2020
CLEAN_XLSX_BUCKET = os.environ["CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN"]
2121

22+
# Map product and year combinations to their xcom keys for dynamic url scraping
23+
xcom_keys = {
24+
(
25+
"complete_monthly_ridership_with_adjustments_and_estimates",
26+
"historical",
27+
): "ridership_url",
28+
("annual_database_agency_information", "2022"): "2022_agency_url",
29+
("annual_database_agency_information", "2023"): "2023_agency_url",
30+
(
31+
"annual_database_contractual_relationship",
32+
"2023",
33+
): "contractual_relationship_url",
34+
}
35+
2236

2337
# pulls the URL from XCom
2438
def pull_url_from_xcom(key, context):
2539
task_instance = context["ti"]
26-
pulled_value = task_instance.xcom_pull(
27-
task_ids="scrape_ntd_xlsx_urls",
28-
key=key
29-
# task_ids="scrape_ntd_xlsx_urls", key="current_url"
30-
)
40+
pulled_value = task_instance.xcom_pull(task_ids="scrape_ntd_xlsx_urls", key=key)
3141
print(f"Pulled value from XCom: {pulled_value}")
3242
return pulled_value
3343

@@ -120,16 +130,10 @@ def __init__(
120130
def execute(self, context, *args, **kwargs):
121131
download_url = self.raw_excel_extract.file_url
122132

123-
if self.product == "complete_monthly_ridership_with_adjustments_and_estimates":
124-
download_url = pull_url_from_xcom(key="ridership_url", context=context)
125-
126-
if self.product == "annual_database_agency_information":
127-
download_url = pull_url_from_xcom(key="agency_url", context=context)
133+
key = (self.product, self.year)
128134

129-
if self.product == "annual_database_contractual_relationship":
130-
download_url = pull_url_from_xcom(
131-
key="contractual_relationship_url", context=context
132-
)
135+
if key in xcom_keys:
136+
download_url = pull_url_from_xcom(key=xcom_keys[key], context=context)
133137

134138
# see what is returned
135139
logging.info(f"reading {self.product} url as {download_url}")
@@ -164,140 +168,3 @@ def execute(self, context, *args, **kwargs):
164168
self.clean_excel_extract.save_content(
165169
fs=get_fs(), content=self.clean_gzipped_content
166170
)
167-
168-
169-
# # pulls the URL from XCom
170-
# def pull_url_from_xcom(context):
171-
# task_instance = context["ti"]
172-
# pulled_value = task_instance.xcom_pull(
173-
# task_ids="scrape_ntd_xlsx_urls", key="current_url"
174-
# )
175-
# print(f"Pulled value from XCom: {pulled_value}")
176-
# return pulled_value
177-
178-
179-
# class NtdDataProductXLSXExtract(PartitionedGCSArtifact):
180-
# bucket: ClassVar[str]
181-
# year: str
182-
# product: str
183-
# execution_ts: pendulum.DateTime = pendulum.now()
184-
# dt: pendulum.Date = execution_ts.date()
185-
# file_url: HttpUrl = None
186-
# partition_names: ClassVar[List[str]] = ["dt", "execution_ts"]
187-
188-
# @property
189-
# def table(self) -> str:
190-
# return self.product
191-
192-
# @property
193-
# def filename(self) -> str:
194-
# return self.table
195-
196-
# class Config:
197-
# arbitrary_types_allowed = True
198-
199-
# def fetch_from_ntd_xlsx(self, file_url):
200-
# # As of now, the only file that we are downloading is for complete_monthly_ridership_with_adjustments_and_estimates
201-
# # and the download link changes every time they update the date, so we have special handling for that here, which is dependent
202-
# # another dag task called scrape_ntd_xlsx_urls.py. if we look to download other xlsx files from the DOT portal and they
203-
# # also change the file name every time they publish, they we will have to add the same handling for all of these files and make it programmatic
204-
205-
# validated_url = parse_obj_as(HttpUrl, file_url)
206-
207-
# logging.info(f"reading file from url {validated_url}")
208-
209-
# try:
210-
# excel_content = requests.get(validated_url).content
211-
212-
# if excel_content is None or len(excel_content) == 0:
213-
# logging.info(
214-
# f"There is no data to download for {self.year} / {self.product}. Ending pipeline."
215-
# )
216-
217-
# pass
218-
219-
# else:
220-
# logging.info(
221-
# f"Downloaded {self.product} data for {self.year} with {len(excel_content)} rows!"
222-
# )
223-
224-
# return excel_content
225-
226-
# except requests.exceptions.RequestException as e:
227-
# logging.info(f"An error occurred: {e}")
228-
229-
# raise
230-
231-
232-
# class RawExtract(NtdDataProductXLSXExtract):
233-
# bucket = RAW_XLSX_BUCKET
234-
235-
236-
# class CleanExtract(NtdDataProductXLSXExtract):
237-
# bucket = CLEAN_XLSX_BUCKET
238-
239-
240-
# class NtdDataProductXLSXOperator(BaseOperator):
241-
# template_fields = ("year", "product", "xlsx_file_url")
242-
243-
# def __init__(
244-
# self,
245-
# product: str,
246-
# xlsx_file_url,
247-
# year: int,
248-
# *args,
249-
# **kwargs,
250-
# ):
251-
# self.year = year
252-
# self.product = product
253-
# self.xlsx_file_url = xlsx_file_url
254-
255-
# # Save initial excel files to the raw bucket
256-
# self.raw_excel_extract = RawExtract(
257-
# year=self.year,
258-
# product=self.product + "_raw" + "/" + self.year,
259-
# file_url=self.xlsx_file_url,
260-
# filename=f"{self.year}__{self.product}_raw.xlsx",
261-
# )
262-
263-
# super().__init__(*args, **kwargs)
264-
265-
# def execute(self, context, *args, **kwargs):
266-
# download_url = self.raw_excel_extract.file_url
267-
268-
# if self.product == "complete_monthly_ridership_with_adjustments_and_estimates":
269-
# download_url = pull_url_from_xcom(context=context)
270-
271-
# # see what is returned
272-
# logging.info(f"reading ridership url as {download_url}")
273-
274-
# excel_content = self.raw_excel_extract.fetch_from_ntd_xlsx(download_url)
275-
276-
# self.raw_excel_extract.save_content(fs=get_fs(), content=excel_content)
277-
278-
# excel_data = BytesIO(excel_content)
279-
# df_dict = pd.read_excel(excel_data, sheet_name=None, engine="openpyxl")
280-
281-
# for key, df in df_dict.items():
282-
# df = df.rename(make_name_bq_safe, axis="columns")
283-
284-
# logging.info(f"read {df.shape[0]} rows and {df.shape[1]} columns")
285-
286-
# self.clean_gzipped_content = gzip.compress(
287-
# df.to_json(orient="records", lines=True).encode()
288-
# )
289-
290-
# tab_name = ""
291-
292-
# tab_name = make_name_bq_safe(key)
293-
294-
# # Save clean gzipped jsonl files to the clean bucket
295-
# self.clean_excel_extract = CleanExtract(
296-
# year=self.year,
297-
# product=self.product + "/" + self.year + "/" + tab_name,
298-
# filename=f"{self.year}__{self.product}__{tab_name}.jsonl.gz",
299-
# )
300-
301-
# self.clean_excel_extract.save_content(
302-
# fs=get_fs(), content=self.clean_gzipped_content
303-
# )

0 commit comments

Comments
 (0)