15
15
import tuf .unittest_toolbox as unittest_toolbox
16
16
17
17
from tests import utils
18
+ from tuf .api .metadata import Metadata
18
19
from tuf import ngclient
20
+ from securesystemslib .signer import SSlibSigner
21
+ from securesystemslib .interface import import_rsa_privatekey_from_file
19
22
20
23
logger = logging .getLogger (__name__ )
21
24
@@ -94,13 +97,13 @@ def setUp(self):
94
97
url_prefix = 'http://' + utils .TEST_HOST_ADDRESS + ':' \
95
98
+ str (self .server_process_handler .port ) + repository_basepath
96
99
97
- metadata_url = f"{ url_prefix } /metadata/"
98
- targets_url = f"{ url_prefix } /targets/"
100
+ self . metadata_url = f"{ url_prefix } /metadata/"
101
+ self . targets_url = f"{ url_prefix } /targets/"
99
102
# Creating a repository instance. The test cases will use this client
100
103
# updater to refresh metadata, fetch target files, etc.
101
104
self .repository_updater = ngclient .Updater (self .client_directory ,
102
- metadata_url ,
103
- targets_url )
105
+ self . metadata_url ,
106
+ self . targets_url )
104
107
105
108
def tearDown (self ):
106
109
# We are inheriting from custom class.
@@ -109,14 +112,91 @@ def tearDown(self):
109
112
# Logs stdout and stderr from the sever subprocess.
110
113
self .server_process_handler .flush_log ()
111
114
115
+ def _create_consistent_target (self , targetname : str , target_hash :str ) -> None :
116
+ """Create consistent targets copies of their non-consistent counterparts
117
+ inside the repository directory.
118
+
119
+ Args:
120
+ targetname: A string denoting the name of the target file.
121
+ target_hash: A string denoting the hash of the target.
122
+
123
+ """
124
+ consistent_target_name = f"{ target_hash } .{ targetname } "
125
+ source_path = os .path .join (self .repository_directory , "targets" , targetname )
126
+ destination_path = os .path .join (
127
+ self .repository_directory , "targets" , consistent_target_name
128
+ )
129
+ shutil .copy (source_path , destination_path )
130
+
131
+
132
+ def _make_root_file_with_consistent_snapshot_true (self ) -> None :
133
+ """Swap the existing root file inside the client directory with a new root
134
+ file where the consistent_snapshot is set to true."""
135
+ root_path = os .path .join (self .client_directory , "root.json" )
136
+ root = Metadata .from_file (root_path )
137
+ root .signed .consistent_snapshot = True
138
+ root_key_path = os .path .join (self .keystore_directory , "root_key" )
139
+ root_key_dict = import_rsa_privatekey_from_file (
140
+ root_key_path , password = "password"
141
+ )
142
+ root_signer = SSlibSigner (root_key_dict )
143
+ root .sign (root_signer )
144
+ # Remove the old root file and replace it with the newer root file.
145
+ os .remove (root_path )
146
+ root .to_file (root_path )
147
+
148
+
149
+ def test_refresh_on_consistent_targets (self ):
150
+ # Generate a new root file where consistent_snapshot is set to true and
151
+ # replace the old root metadata file with it.
152
+ self ._make_root_file_with_consistent_snapshot_true ()
153
+ self .repository_updater = ngclient .Updater (self .client_directory ,
154
+ self .metadata_url ,
155
+ self .targets_url )
156
+ # All metadata is in local directory already
157
+ self .repository_updater .refresh ()
158
+
159
+ # Get targetinfo for "file1.txt" listed in targets
160
+ targetinfo1 = self .repository_updater .get_one_valid_targetinfo ("file1.txt" )
161
+ # Get targetinfo for "file3.txt" listed in the delegated role1
162
+ targetinfo3 = self .repository_updater .get_one_valid_targetinfo ("file3.txt" )
163
+
164
+ # Create consistent targets with file path HASH.FILENAME.EXT
165
+ target1_hash = list (targetinfo1 ["fileinfo" ].hashes .values ())[0 ]
166
+ target3_hash = list (targetinfo3 ["fileinfo" ].hashes .values ())[0 ]
167
+ self ._create_consistent_target ("file1.txt" , target1_hash )
168
+ self ._create_consistent_target ("file3.txt" , target3_hash )
169
+
170
+ destination_directory = self .make_temp_directory ()
171
+ updated_targets = self .repository_updater .updated_targets (
172
+ [targetinfo1 , targetinfo3 ], destination_directory
173
+ )
174
+
175
+ self .assertListEqual (updated_targets , [targetinfo1 , targetinfo3 ])
176
+ self .repository_updater .download_target (targetinfo1 , destination_directory )
177
+ updated_targets = self .repository_updater .updated_targets (
178
+ updated_targets , destination_directory
179
+ )
180
+
181
+ self .assertListEqual (updated_targets , [targetinfo3 ])
182
+
183
+ self .repository_updater .download_target (targetinfo3 , destination_directory )
184
+ updated_targets = self .repository_updater .updated_targets (
185
+ updated_targets , destination_directory
186
+ )
187
+
188
+ self .assertListEqual (updated_targets , [])
189
+
112
190
def test_refresh (self ):
191
+ # Test refresh without consistent targets - targets without hash prefixes.
192
+
113
193
# All metadata is in local directory already
114
194
self .repository_updater .refresh ()
115
195
116
196
# Get targetinfo for 'file1.txt' listed in targets
117
- targetinfo1 = self .repository_updater .get_one_valid_targetinfo (' file1.txt' )
197
+ targetinfo1 = self .repository_updater .get_one_valid_targetinfo (" file1.txt" )
118
198
# Get targetinfo for 'file3.txt' listed in the delegated role1
119
- targetinfo3 = self .repository_updater .get_one_valid_targetinfo (' file3.txt' )
199
+ targetinfo3 = self .repository_updater .get_one_valid_targetinfo (" file3.txt" )
120
200
121
201
destination_directory = self .make_temp_directory ()
122
202
updated_targets = self .repository_updater .updated_targets ([targetinfo1 , targetinfo3 ],
@@ -146,7 +226,7 @@ def test_refresh_with_only_local_root(self):
146
226
self .repository_updater .refresh ()
147
227
148
228
# Get targetinfo for 'file3.txt' listed in the delegated role1
149
- targetinfo3 = self .repository_updater .get_one_valid_targetinfo ('file3.txt' )
229
+ targetinfo3 = self .repository_updater .get_one_valid_targetinfo ('file3.txt' )
150
230
151
231
if __name__ == '__main__' :
152
232
utils .configure_test_logging (sys .argv )
0 commit comments