@@ -61,6 +61,18 @@ class InvalidMetadata(Exception):
6161 pass
6262
6363
64+ class NoScheduleDataSpecified (Exception ):
65+ pass
66+
67+
68+ class ScheduleDataNotFound (Exception ):
69+ pass
70+
71+
72+ class NoValidatorResults (Exception ):
73+ pass
74+
75+
6476class RTProcessingStep (str , Enum ):
6577 parse = "parse"
6678 validate = "validate"
@@ -77,14 +89,6 @@ class RTValidationMetadata(BaseModel):
7789 gtfs_validator_version : str
7890
7991
80- class NoScheduleDataSpecified (Exception ):
81- pass
82-
83-
84- class ScheduleDataNotFound (Exception ):
85- pass
86-
87-
8892class RTHourlyAggregation (PartitionedGCSArtifact ):
8993 partition_names : ClassVar [List [str ]] = ["dt" , "hour" , "base64_url" ]
9094 step : RTProcessingStep
@@ -277,7 +281,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
277281 .get_url_schedule (self .base64_validation_url )
278282 )
279283 except KeyError :
280- print (
284+ typer . secho (
281285 f"no schedule data found for { self .base64_validation_url } on day { day } "
282286 )
283287 continue
@@ -287,7 +291,7 @@ def download(self, date: datetime.datetime) -> Optional[str]:
287291 self .fs .get (schedule_extract .path , gtfs_zip )
288292 return gtfs_zip
289293 except FileNotFoundError :
290- print (
294+ typer . secho (
291295 f"no schedule file found for { self .base64_validation_url } on day { day } "
292296 )
293297 continue
@@ -346,17 +350,17 @@ def get_local_paths(self) -> Dict[str, GTFSRTFeedExtract]:
346350 def get_results_paths (self ) -> Dict [str , GTFSRTFeedExtract ]:
347351 return {e .get_results_path (): e .extract for e in self .get_extracts ()}
348352
349- def get_hashed_results (self ):
353+ def get_hashed_results (self ) -> Dict [ str , Any ] :
350354 hashed = {}
351355 for e in self .get_extracts ():
352356 if e .has_results ():
353- hashed [e .hash ()] = e .get_results ()
357+ hashed [e .hash (). hex () ] = e .get_results ()
354358 return hashed
355359
356- def get_hashes (self ) -> Dict [bytes , List [GTFSRTFeedExtract ]]:
357- hashed : Dict [bytes , List [GTFSRTFeedExtract ]] = defaultdict (list )
360+ def get_hashes (self ) -> Dict [str , List [GTFSRTFeedExtract ]]:
361+ hashed : Dict [str , List [GTFSRTFeedExtract ]] = defaultdict (list )
358362 for e in self .get_extracts ():
359- hashed [e .hash ()].append (e .extract )
363+ hashed [e .hash (). hex () ].append (e .extract )
360364 return hashed
361365
362366 def download (self ):
@@ -507,7 +511,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
507511 e = ScheduleDataNotFound (
508512 f"no recent schedule data found for { self .aggregation .extracts [0 ].path } "
509513 )
510- print (e )
514+ typer . secho (e )
511515
512516 scope .fingerprint = [
513517 type (e ),
@@ -571,11 +575,11 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
571575 for hash , extracts in aggregation_extracts .get_hashes ().items ():
572576 try :
573577 records = hashed_results [hash ]
574- except KeyError as e :
578+ except KeyError :
575579 if self .verbose :
576580 paths = ", " .join (e .path for e in extracts )
577581 typer .secho (
578- f"WARNING: no results found for { paths } " ,
582+ f"WARNING: validator did not produce results for { paths } " ,
579583 fg = typer .colors .YELLOW ,
580584 )
581585
@@ -584,7 +588,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
584588 RTFileProcessingOutcome (
585589 step = self .aggregation .step ,
586590 success = False ,
587- exception = e ,
591+ exception = NoValidatorResults ( "No validator output" ) ,
588592 extract = extract ,
589593 )
590594 )
@@ -680,7 +684,7 @@ def process(self, tmp_dir: str, scope) -> List[RTFileProcessingOutcome]:
680684 except DecodeError as e :
681685 if self .verbose :
682686 typer .secho (
683- f"WARNING: DecodeError for { str (extract .path )} " ,
687+ f'DecodeError: " { str ( e ) } " thrown when decoding { str (extract .path )} ' ,
684688 fg = typer .colors .YELLOW ,
685689 )
686690 outcomes .append (
@@ -918,13 +922,9 @@ def main(
918922 # TODO: I dislike having to exclude the records here
919923 # I need to figure out the best way to have a single type represent the "metadata" of
920924 # the content as well as the content itself
921- result .save_content (
922- fs = get_fs (),
923- content = "\n " .join (
924- (json .dumps (make_pydantic_model_bq_safe (o )) for o in result .outcomes )
925- ).encode (),
926- exclude = {"outcomes" },
927- )
925+ raw = [json .dumps (make_pydantic_model_bq_safe (o )) for o in result .outcomes ]
926+ content = "\n " .join (raw ).encode ("utf-8" )
927+ result .save_content (fs = get_fs (), content = content , exclude = {"outcomes" })
928928
929929 assert (
930930 len (outcomes )
0 commit comments