10
10
import fnmatch
11
11
import logging
12
12
import os
13
- from typing import BinaryIO , Dict , Optional , TextIO
13
+ from typing import Dict , Optional
14
14
15
15
from securesystemslib import exceptions as sslib_exceptions
16
16
from securesystemslib import hash as sslib_hash
@@ -158,9 +158,9 @@ def download_target(self, target: Dict, destination_directory: str):
158
158
temp_obj = download .download_file (
159
159
file_mirror , target ["fileinfo" ]["length" ], self ._fetcher
160
160
)
161
-
161
+ _check_file_length ( temp_obj , target [ "fileinfo" ][ "length" ])
162
162
temp_obj .seek (0 )
163
- self . _verify_target_file (temp_obj , target )
163
+ _check_hashes_obj (temp_obj , target [ "fileinfo" ][ "hashes" ] )
164
164
break
165
165
166
166
except Exception as exception :
@@ -297,7 +297,7 @@ def _root_mirrors_download(self, root_mirrors: Dict) -> "RootWrapper":
297
297
)
298
298
299
299
temp_obj .seek (0 )
300
- intermediate_root = self ._verify_root (temp_obj )
300
+ intermediate_root = self ._verify_root (temp_obj . read () )
301
301
# When we reach this point, a root file has been successfully
302
302
# downloaded and verified so we can exit the loop.
303
303
break
@@ -344,7 +344,7 @@ def _load_timestamp(self) -> None:
344
344
)
345
345
346
346
temp_obj .seek (0 )
347
- verified_timestamp = self ._verify_timestamp (temp_obj )
347
+ verified_timestamp = self ._verify_timestamp (temp_obj . read () )
348
348
break
349
349
350
350
except Exception as exception : # pylint: disable=broad-except
@@ -397,7 +397,7 @@ def _load_snapshot(self) -> None:
397
397
)
398
398
399
399
temp_obj .seek (0 )
400
- verified_snapshot = self ._verify_snapshot (temp_obj )
400
+ verified_snapshot = self ._verify_snapshot (temp_obj . read () )
401
401
break
402
402
403
403
except Exception as exception : # pylint: disable=broad-except
@@ -451,7 +451,7 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None:
451
451
452
452
temp_obj .seek (0 )
453
453
verified_targets = self ._verify_targets (
454
- temp_obj , targets_role , parent_role
454
+ temp_obj . read () , targets_role , parent_role
455
455
)
456
456
break
457
457
@@ -472,12 +472,12 @@ def _load_targets(self, targets_role: str, parent_role: str) -> None:
472
472
self ._get_full_meta_name (targets_role , extension = ".json" )
473
473
)
474
474
475
- def _verify_root (self , temp_obj : TextIO ) -> RootWrapper :
475
+ def _verify_root (self , file_content : bytes ) -> RootWrapper :
476
476
"""
477
477
TODO
478
478
"""
479
479
480
- intermediate_root = RootWrapper .from_json_object (temp_obj )
480
+ intermediate_root = RootWrapper .from_json_object (file_content )
481
481
482
482
# Check for an arbitrary software attack
483
483
trusted_root = self ._metadata ["root" ]
@@ -490,7 +490,6 @@ def _verify_root(self, temp_obj: TextIO) -> RootWrapper:
490
490
491
491
# Check for a rollback attack.
492
492
if intermediate_root .version < trusted_root .version :
493
- temp_obj .close ()
494
493
raise exceptions .ReplayedMetadataError (
495
494
"root" , intermediate_root .version (), trusted_root .version ()
496
495
)
@@ -499,11 +498,11 @@ def _verify_root(self, temp_obj: TextIO) -> RootWrapper:
499
498
500
499
return intermediate_root
501
500
502
- def _verify_timestamp (self , temp_obj : TextIO ) -> TimestampWrapper :
501
+ def _verify_timestamp (self , file_content : bytes ) -> TimestampWrapper :
503
502
"""
504
503
TODO
505
504
"""
506
- intermediate_timestamp = TimestampWrapper .from_json_object (temp_obj )
505
+ intermediate_timestamp = TimestampWrapper .from_json_object (file_content )
507
506
508
507
# Check for an arbitrary software attack
509
508
trusted_root = self ._metadata ["root" ]
@@ -517,7 +516,6 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
517
516
intermediate_timestamp .signed .version
518
517
<= self ._metadata ["timestamp" ].version
519
518
):
520
- temp_obj .close ()
521
519
raise exceptions .ReplayedMetadataError (
522
520
"root" ,
523
521
intermediate_timestamp .version (),
@@ -529,7 +527,6 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
529
527
intermediate_timestamp .snapshot .version
530
528
<= self ._metadata ["timestamp" ].snapshot ["version" ]
531
529
):
532
- temp_obj .close ()
533
530
raise exceptions .ReplayedMetadataError (
534
531
"root" ,
535
532
intermediate_timestamp .snapshot .version (),
@@ -540,24 +537,23 @@ def _verify_timestamp(self, temp_obj: TextIO) -> TimestampWrapper:
540
537
541
538
return intermediate_timestamp
542
539
543
- def _verify_snapshot (self , temp_obj : TextIO ) -> SnapshotWrapper :
540
+ def _verify_snapshot (self , file_content : bytes ) -> SnapshotWrapper :
544
541
"""
545
542
TODO
546
543
"""
547
544
548
545
# Check against timestamp metadata
549
546
if self ._metadata ["timestamp" ].snapshot .get ("hash" ):
550
547
_check_hashes (
551
- temp_obj , self ._metadata ["timestamp" ].snapshot .get ("hash" )
548
+ file_content , self ._metadata ["timestamp" ].snapshot .get ("hash" )
552
549
)
553
550
554
- intermediate_snapshot = SnapshotWrapper .from_json_object (temp_obj )
551
+ intermediate_snapshot = SnapshotWrapper .from_json_object (file_content )
555
552
556
553
if (
557
554
intermediate_snapshot .version
558
555
!= self ._metadata ["timestamp" ].snapshot ["version" ]
559
556
):
560
- temp_obj .close ()
561
557
raise exceptions .BadVersionNumberError
562
558
563
559
# Check for an arbitrary software attack
@@ -573,15 +569,14 @@ def _verify_snapshot(self, temp_obj: TextIO) -> SnapshotWrapper:
573
569
target_role ["version" ]
574
570
!= self ._metadata ["snapshot" ].meta [target_role ]["version" ]
575
571
):
576
- temp_obj .close ()
577
572
raise exceptions .BadVersionNumberError
578
573
579
574
intermediate_snapshot .expires ()
580
575
581
576
return intermediate_snapshot
582
577
583
578
def _verify_targets (
584
- self , temp_obj : TextIO , filename : str , parent_role : str
579
+ self , file_content : bytes , filename : str , parent_role : str
585
580
) -> TargetsWrapper :
586
581
"""
587
582
TODO
@@ -590,15 +585,14 @@ def _verify_targets(
590
585
# Check against timestamp metadata
591
586
if self ._metadata ["snapshot" ].role (filename ).get ("hash" ):
592
587
_check_hashes (
593
- temp_obj , self ._metadata ["snapshot" ].targets .get ("hash" )
588
+ file_content , self ._metadata ["snapshot" ].targets .get ("hash" )
594
589
)
595
590
596
- intermediate_targets = TargetsWrapper .from_json_object (temp_obj )
591
+ intermediate_targets = TargetsWrapper .from_json_object (file_content )
597
592
if (
598
593
intermediate_targets .version
599
594
!= self ._metadata ["snapshot" ].role (filename )["version" ]
600
595
):
601
- temp_obj .close ()
602
596
raise exceptions .BadVersionNumberError
603
597
604
598
# Check for an arbitrary software attack
@@ -612,15 +606,6 @@ def _verify_targets(
612
606
613
607
return intermediate_targets
614
608
615
- @staticmethod
616
- def _verify_target_file (temp_obj : BinaryIO , targetinfo : Dict ) -> None :
617
- """
618
- TODO
619
- """
620
-
621
- _check_file_length (temp_obj , targetinfo ["fileinfo" ]["length" ])
622
- _check_hashes (temp_obj , targetinfo ["fileinfo" ]["hashes" ])
623
-
624
609
def _preorder_depth_first_walk (self , target_filepath ) -> Dict :
625
610
"""
626
611
TODO
@@ -849,19 +834,34 @@ def _check_file_length(file_object, trusted_file_length):
849
834
)
850
835
851
836
852
- def _check_hashes (file_object , trusted_hashes ):
837
+ def _check_hashes_obj (file_object , trusted_hashes ):
838
+ """
839
+ TODO
840
+ """
841
+ for algorithm , trusted_hash in trusted_hashes .items ():
842
+ digest_object = sslib_hash .digest_fileobject (file_object , algorithm )
843
+
844
+ computed_hash = digest_object .hexdigest ()
845
+
846
+ # Raise an exception if any of the hashes are incorrect.
847
+ if trusted_hash != computed_hash :
848
+ raise sslib_exceptions .BadHashError (trusted_hash , computed_hash )
849
+
850
+ logger .info (
851
+ "The file's " + algorithm + " hash is" " correct: " + trusted_hash
852
+ )
853
+
854
+
855
+ def _check_hashes (file_content , trusted_hashes ):
853
856
"""
854
857
TODO
855
858
"""
856
859
# Verify each trusted hash of 'trusted_hashes'. If all are valid, simply
857
860
# return.
858
861
for algorithm , trusted_hash in trusted_hashes .items ():
859
862
digest_object = sslib_hash .digest (algorithm )
860
- # Ensure we read from the beginning of the file object
861
- # TODO: should we store file position (before the loop) and reset
862
- # after we seek about?
863
- file_object .seek (0 )
864
- digest_object .update (file_object .read ())
863
+
864
+ digest_object .update (file_content )
865
865
computed_hash = digest_object .hexdigest ()
866
866
867
867
# Raise an exception if any of the hashes are incorrect.
0 commit comments