10
10
import fnmatch
11
11
import logging
12
12
import os
13
- from typing import BinaryIO , Dict , Optional , TextIO
13
+ from typing import Dict , Optional
14
14
15
15
from securesystemslib import exceptions as sslib_exceptions
16
16
from securesystemslib import hash as sslib_hash
@@ -159,9 +159,9 @@ def download_target(self, target: Dict, destination_directory: str):
159
159
temp_obj = download .download_file (
160
160
file_mirror , target ["fileinfo" ]["length" ], self ._fetcher
161
161
)
162
-
162
+ _check_file_length ( temp_obj , target [ "fileinfo" ][ "length" ])
163
163
temp_obj .seek (0 )
164
- self . _verify_target_file (temp_obj , target )
164
+ _check_hashes_obj (temp_obj , target [ "fileinfo" ][ "hashes" ] )
165
165
break
166
166
167
167
except Exception as exception : # pylint: disable=broad-except
@@ -308,7 +308,7 @@ def _root_mirrors_download(self, root_mirrors: Dict) -> "RootWrapper":
308
308
)
309
309
310
310
temp_obj .seek (0 )
311
- intermediate_root = self ._verify_root (temp_obj )
311
+ intermediate_root = self ._verify_root (temp_obj . read () )
312
312
# When we reach this point, a root file has been successfully
313
313
# downloaded and verified so we can exit the loop.
314
314
break
@@ -356,7 +356,7 @@ def _load_timestamp(self) -> None:
356
356
)
357
357
358
358
temp_obj .seek (0 )
359
- verified_timestamp = self ._verify_timestamp (temp_obj )
359
+ verified_timestamp = self ._verify_timestamp (temp_obj . read () )
360
360
break
361
361
362
362
except Exception as exception : # pylint: disable=broad-except
@@ -410,7 +410,7 @@ def _load_snapshot(self) -> None:
410
410
)
411
411
412
412
temp_obj .seek (0 )
413
- verified_snapshot = self ._verify_snapshot (temp_obj )
413
+ verified_snapshot = self ._verify_snapshot (temp_obj . read () )
414
414
break
415
415
416
416
except Exception as exception : # pylint: disable=broad-except
@@ -465,7 +465,7 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None:
465
465
466
466
temp_obj .seek (0 )
467
467
verified_targets = self ._verify_targets (
468
- temp_obj , targets_role , parent_role
468
+ temp_obj . read () , targets_role , parent_role
469
469
)
470
470
break
471
471
@@ -487,12 +487,12 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None:
487
487
self ._get_full_meta_name (targets_role , extension = ".json" )
488
488
)
489
489
490
- def _verify_root (self , temp_obj : TextIO ) -> RootWrapper :
490
+ def _verify_root (self , file_content : bytes ) -> RootWrapper :
491
491
"""
492
492
TODO
493
493
"""
494
494
495
- intermediate_root = RootWrapper .from_json_object (temp_obj )
495
+ intermediate_root = RootWrapper .from_json_object (file_content )
496
496
497
497
# Check for an arbitrary software attack
498
498
trusted_root = self ._metadata ["root" ]
@@ -505,7 +505,6 @@ def _verify_root(self, temp_obj: TextIO) -> RootWrapper:
505
505
506
506
# Check for a rollback attack.
507
507
if intermediate_root .version < trusted_root .version :
508
- temp_obj .close ()
509
508
raise exceptions .ReplayedMetadataError (
510
509
"root" , intermediate_root .version (), trusted_root .version ()
511
510
)
@@ -514,11 +513,11 @@ def _verify_root(self, temp_obj: TextIO) -> RootWrapper:
514
513
515
514
return intermediate_root
516
515
517
- def _verify_timestamp (self , temp_obj : TextIO ) -> TimestampWrapper :
516
+ def _verify_timestamp (self , file_content : bytes ) -> TimestampWrapper :
518
517
"""
519
518
TODO
520
519
"""
521
- intermediate_timestamp = TimestampWrapper .from_json_object (temp_obj )
520
+ intermediate_timestamp = TimestampWrapper .from_json_object (file_content )
522
521
523
522
# Check for an arbitrary software attack
524
523
trusted_root = self ._metadata ["root" ]
@@ -532,7 +531,6 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
532
531
intermediate_timestamp .signed .version
533
532
<= self ._metadata ["timestamp" ].version
534
533
):
535
- temp_obj .close ()
536
534
raise exceptions .ReplayedMetadataError (
537
535
"root" ,
538
536
intermediate_timestamp .version (),
@@ -544,7 +542,6 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
544
542
intermediate_timestamp .snapshot .version
545
543
<= self ._metadata ["timestamp" ].snapshot ["version" ]
546
544
):
547
- temp_obj .close ()
548
545
raise exceptions .ReplayedMetadataError (
549
546
"root" ,
550
547
intermediate_timestamp .snapshot .version (),
@@ -555,24 +552,23 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
555
552
556
553
return intermediate_timestamp
557
554
558
- def _verify_snapshot (self , temp_obj : TextIO ) -> SnapshotWrapper :
555
+ def _verify_snapshot (self , file_content : bytes ) -> SnapshotWrapper :
559
556
"""
560
557
TODO
561
558
"""
562
559
563
560
# Check against timestamp metadata
564
561
if self ._metadata ["timestamp" ].snapshot .get ("hash" ):
565
562
_check_hashes (
566
- temp_obj , self ._metadata ["timestamp" ].snapshot .get ("hash" )
563
+ file_content , self ._metadata ["timestamp" ].snapshot .get ("hash" )
567
564
)
568
565
569
- intermediate_snapshot = SnapshotWrapper .from_json_object (temp_obj )
566
+ intermediate_snapshot = SnapshotWrapper .from_json_object (file_content )
570
567
571
568
if (
572
569
intermediate_snapshot .version
573
570
!= self ._metadata ["timestamp" ].snapshot ["version" ]
574
571
):
575
- temp_obj .close ()
576
572
raise exceptions .BadVersionNumberError
577
573
578
574
# Check for an arbitrary software attack
@@ -588,15 +584,14 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper:
588
584
target_role ["version" ]
589
585
!= self ._metadata ["snapshot" ].meta [target_role ]["version" ]
590
586
):
591
- temp_obj .close ()
592
587
raise exceptions .BadVersionNumberError
593
588
594
589
intermediate_snapshot .expires ()
595
590
596
591
return intermediate_snapshot
597
592
598
593
def _verify_targets (
599
- self , temp_obj : TextIO , filename : str , parent_role : str
594
+ self , file_content : bytes , filename : str , parent_role : str
600
595
) -> TargetsWrapper :
601
596
"""
602
597
TODO
@@ -605,15 +600,14 @@ def _verify_targets(
605
600
# Check against timestamp metadata
606
601
if self ._metadata ["snapshot" ].role (filename ).get ("hash" ):
607
602
_check_hashes (
608
- temp_obj , self ._metadata ["snapshot" ].targets .get ("hash" )
603
+ file_content , self ._metadata ["snapshot" ].targets .get ("hash" )
609
604
)
610
605
611
- intermediate_targets = TargetsWrapper .from_json_object (temp_obj )
606
+ intermediate_targets = TargetsWrapper .from_json_object (file_content )
612
607
if (
613
608
intermediate_targets .version
614
609
!= self ._metadata ["snapshot" ].role (filename )["version" ]
615
610
):
616
- temp_obj .close ()
617
611
raise exceptions .BadVersionNumberError
618
612
619
613
# Check for an arbitrary software attack
@@ -627,15 +621,6 @@ def _verify_targets(
627
621
628
622
return intermediate_targets
629
623
630
- @staticmethod
631
- def _verify_target_file (temp_obj : BinaryIO , targetinfo : Dict ) -> None :
632
- """
633
- TODO
634
- """
635
-
636
- _check_file_length (temp_obj , targetinfo ["fileinfo" ]["length" ])
637
- _check_hashes (temp_obj , targetinfo ["fileinfo" ]["hashes" ])
638
-
639
624
def _preorder_depth_first_walk (self , target_filepath ) -> Dict :
640
625
"""
641
626
TODO
@@ -864,19 +849,34 @@ def _check_file_length(file_object, trusted_file_length):
864
849
)
865
850
866
851
867
- def _check_hashes (file_object , trusted_hashes ):
852
+ def _check_hashes_obj (file_object , trusted_hashes ):
853
+ """
854
+ TODO
855
+ """
856
+ for algorithm , trusted_hash in trusted_hashes .items ():
857
+ digest_object = sslib_hash .digest_fileobject (file_object , algorithm )
858
+
859
+ computed_hash = digest_object .hexdigest ()
860
+
861
+ # Raise an exception if any of the hashes are incorrect.
862
+ if trusted_hash != computed_hash :
863
+ raise sslib_exceptions .BadHashError (trusted_hash , computed_hash )
864
+
865
+ logger .info (
866
+ "The file's " + algorithm + " hash is" " correct: " + trusted_hash
867
+ )
868
+
869
+
870
+ def _check_hashes (file_content , trusted_hashes ):
868
871
"""
869
872
TODO
870
873
"""
871
874
# Verify each trusted hash of 'trusted_hashes'. If all are valid, simply
872
875
# return.
873
876
for algorithm , trusted_hash in trusted_hashes .items ():
874
877
digest_object = sslib_hash .digest (algorithm )
875
- # Ensure we read from the beginning of the file object
876
- # TODO: should we store file position (before the loop) and reset
877
- # after we seek about?
878
- file_object .seek (0 )
879
- digest_object .update (file_object .read ())
878
+
879
+ digest_object .update (file_content )
880
880
computed_hash = digest_object .hexdigest ()
881
881
882
882
# Raise an exception if any of the hashes are incorrect.
0 commit comments