33# Copyright 2021, New York University and the TUF contributors
44# SPDX-License-Identifier: MIT OR Apache-2.0
55
6- """Test updating delegated targets roles with various
7- delegation hierarchies """
6+ """Test updating delegated targets roles and searching for
7+ target files with various delegation graphs """
88
99import os
1010import sys
2121 DelegatedRole ,
2222 Targets ,
2323)
24+ from tuf .exceptions import UnsignedMetadataError
2425from tuf .ngclient import Updater
2526
2627
@@ -35,18 +36,32 @@ class TestDelegation:
3536 path_hash_prefixes : Optional [List [str ]] = None
3637
3738
39+ @dataclass
40+ class TestTarget :
41+ rolename : str
42+ content : bytes
43+ targetpath : str
44+
45+
3846@dataclass
3947class DelegationsTestCase :
40- """Describes a delegations graph as a list of delegations
41- and the expected order of traversal as 'visited_order' ."""
48+ """A delegations graph as lists of delegations and target files
49+ and the expected order of traversal as a list of role names ."""
4250
4351 delegations : List [TestDelegation ]
44- visited_order : List [str ]
52+ target_files : List [TestTarget ] = field (default_factory = list )
53+ visited_order : List [str ] = field (default_factory = list )
4554
4655
47- class TestDelegationsGraphs (unittest .TestCase ):
48- """Test creating delegations graphs with different complexity
49- and successfully updating the delegated roles metadata"""
56+ @dataclass
57+ class TargetTestCase :
58+ targetpath : str
59+ found : bool
60+ visited_order : List [str ] = field (default_factory = list )
61+
62+
63+ class TestDelegations (unittest .TestCase ):
64+ """Base class for delegation tests"""
5065
5166 # set dump_dir to trigger repository state dumps
5267 dump_dir : Optional [str ] = None
@@ -59,70 +74,73 @@ def setUp(self) -> None:
5974 self .targets_dir = os .path .join (self .temp_dir .name , "targets" )
6075 os .mkdir (self .metadata_dir )
6176 os .mkdir (self .targets_dir )
77+ self .sim : RepositorySimulator
6278
6379 def tearDown (self ) -> None :
6480 self .temp_dir .cleanup ()
6581
66- def setup_subtest (
67- self , delegations : List [TestDelegation ]
68- ) -> RepositorySimulator :
69- sim = self ._init_repo (delegations )
70-
82+ def setup_subtest (self ) -> None :
7183 self .subtest_count += 1
7284 if self .dump_dir is not None :
7385 # create subtest dumpdir
7486 name = f"{ self .id ().split ('.' )[- 1 ]} -{ self .subtest_count } "
75- sim .dump_dir = os .path .join (self .dump_dir , name )
76- os .mkdir (sim .dump_dir )
87+ self . sim .dump_dir = os .path .join (self .dump_dir , name )
88+ os .mkdir (self . sim .dump_dir )
7789 # dump the repo simulator metadata
78- sim .write ()
79-
80- return sim
90+ self .sim .write ()
8191
8292 def teardown_subtest (self ) -> None :
83- # clean up after each subtest
8493 utils .cleanup_dir (self .metadata_dir )
8594
86- def _init_updater (self , sim : RepositorySimulator ) -> Updater :
87- """Create a new Updater instance"""
88- return Updater (
89- self .metadata_dir ,
90- "https://example.com/metadata/" ,
91- self .targets_dir ,
92- "https://example.com/targets/" ,
93- sim ,
94- )
95+ def _init_repo (self , test_case : DelegationsTestCase ) -> None :
96+ """Create a new RepositorySimulator instance and
97+ populate it with delegations and target files"""
9598
96- def _init_repo (
97- self , delegations : List [TestDelegation ]
98- ) -> RepositorySimulator :
99- """Create a new RepositorySimulator instance with 'delegations'"""
100- sim = RepositorySimulator ()
99+ self .sim = RepositorySimulator ()
101100 spec_version = "." .join (SPECIFICATION_VERSION )
102-
103- for d in delegations :
104- if d .rolename in sim .md_delegates :
105- targets = sim .md_delegates [d .rolename ].signed
101+ for d in test_case .delegations :
102+ if d .rolename in self .sim .md_delegates :
103+ targets = self .sim .md_delegates [d .rolename ].signed
106104 else :
107- targets = Targets (1 , spec_version , sim .safe_expiry , {}, None )
108-
105+ targets = Targets (
106+ 1 , spec_version , self .sim .safe_expiry , {}, None
107+ )
109108 # unpack 'd' but skip "delegator"
110109 role = DelegatedRole (* astuple (d )[1 :])
111- sim .add_delegation (d .delegator , role , targets )
112- sim .update_snapshot ()
110+ self .sim .add_delegation (d .delegator , role , targets )
111+
112+ for target in test_case .target_files :
113+ self .sim .add_target (* astuple (target ))
114+
115+ if test_case .target_files :
116+ self .sim .targets .version += 1
117+ self .sim .update_snapshot ()
113118
119+ def _init_updater (self ) -> Updater :
120+ """Create a new Updater instance"""
114121 # Init trusted root for Updater
115122 with open (os .path .join (self .metadata_dir , "root.json" ), "bw" ) as f :
116- f .write (sim .signed_roots [0 ])
123+ f .write (self . sim .signed_roots [0 ])
117124
118- return sim
125+ return Updater (
126+ self .metadata_dir ,
127+ "https://example.com/metadata/" ,
128+ self .targets_dir ,
129+ "https://example.com/targets/" ,
130+ self .sim ,
131+ )
119132
120133 def _assert_files_exist (self , roles : Iterable [str ]) -> None :
121134 """Assert that local metadata files exist for 'roles'"""
122135 expected_files = sorted ([f"{ role } .json" for role in roles ])
123136 local_metadata_files = sorted (os .listdir (self .metadata_dir ))
124137 self .assertListEqual (local_metadata_files , expected_files )
125138
139+
140+ class TestDelegationsGraphs (TestDelegations ):
141+ """Test creating delegations graphs with different complexity
142+ and successfully updating the delegated roles metadata"""
143+
126144 graphs : utils .DataSet = {
127145 "basic delegation" : DelegationsTestCase (
128146 delegations = [TestDelegation ("targets" , "A" )],
@@ -226,6 +244,17 @@ def _assert_files_exist(self, roles: Iterable[str]) -> None:
226244 # 'C' is reached through 'B' since 'A' does not delegate a matching pattern"
227245 visited_order = ["A" , "B" , "C" ],
228246 ),
247+ "max number of delegations" : DelegationsTestCase (
248+ delegations = [
249+ TestDelegation ("targets" , "A" ),
250+ TestDelegation ("targets" , "B" ),
251+ TestDelegation ("targets" , "C" ),
252+ TestDelegation ("C" , "D" ),
253+ TestDelegation ("C" , "E" ),
254+ ],
255+ # "E" is skipped, max_delegations is 4
256+ visited_order = ["A" , "B" , "C" , "D" ],
257+ ),
229258 }
230259
231260 @utils .run_sub_tests_with_dataset (graphs )
@@ -237,11 +266,15 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
237266 exp_files = [* TOP_LEVEL_ROLE_NAMES , * test_data .visited_order ]
238267 exp_calls = [(role , 1 ) for role in test_data .visited_order ]
239268
240- sim = self .setup_subtest (test_data .delegations )
241- updater = self ._init_updater (sim )
269+ self ._init_repo (test_data )
270+ self .setup_subtest ()
271+
272+ updater = self ._init_updater ()
273+ # restrict the max number of delegations to simplify the test
274+ updater .config .max_delegations = 4
242275 # Call explicitly refresh to simplify the expected_calls list
243276 updater .refresh ()
244- sim .fetch_tracker .metadata .clear ()
277+ self . sim .fetch_tracker .metadata .clear ()
245278 # Check that metadata dir contains only top-level roles
246279 self ._assert_files_exist (TOP_LEVEL_ROLE_NAMES )
247280
@@ -251,16 +284,132 @@ def test_graph_traversal(self, test_data: DelegationsTestCase) -> None:
251284 self .assertIsNone (targetfile )
252285 # Check that the delegated roles were visited in the expected
253286 # order and the corresponding metadata files were persisted
254- self .assertListEqual (sim .fetch_tracker .metadata , exp_calls )
287+ self .assertListEqual (self .sim .fetch_tracker .metadata , exp_calls )
288+ self ._assert_files_exist (exp_files )
289+ finally :
290+ self .teardown_subtest ()
291+
292+ invalid_metadata : utils .DataSet = {
293+ "unsigned delegated role" : DelegationsTestCase (
294+ delegations = [
295+ TestDelegation ("targets" , "invalid" ),
296+ TestDelegation ("targets" , "B" ),
297+ TestDelegation ("invalid" , "C" ),
298+ ],
299+ # The traversal stops after visiting an invalid role
300+ visited_order = ["invalid" ],
301+ )
302+ }
303+
304+ @utils .run_sub_tests_with_dataset (invalid_metadata )
305+ def test_invalid_metadata (self , test_data : DelegationsTestCase ) -> None :
306+ try :
307+ self ._init_repo (test_data )
308+ # The invalid role is the last visited
309+ invalid_role = test_data .visited_order [- 1 ]
310+ self .sim .signers [invalid_role ].clear ()
311+
312+ self .setup_subtest ()
313+ # The invalid role metadata must not be persisted
314+ exp_files = [* TOP_LEVEL_ROLE_NAMES , * test_data .visited_order [:- 1 ]]
315+ exp_calls = [(role , 1 ) for role in test_data .visited_order ]
316+
317+ updater = self ._init_updater ()
318+ # Call explicitly refresh to simplify the expected_calls list
319+ updater .refresh ()
320+ self .sim .fetch_tracker .metadata .clear ()
321+
322+ with self .assertRaises (UnsignedMetadataError ):
323+ updater .get_targetinfo ("missingpath" )
324+ # Check that there were no visited roles after the invalid one
325+ # and only the valid metadata files were persisted
326+ self .assertListEqual (self .sim .fetch_tracker .metadata , exp_calls )
327+ self ._assert_files_exist (exp_files )
328+ finally :
329+ self .teardown_subtest ()
330+
331+
332+ class TestTargetFileSearch (TestDelegations ):
333+ r"""
334+ Create a single repository with the following delegations:
335+
336+ targets
337+ *.doc, *md / \ release/*/*
338+ A B
339+ release/x/* / \ release/y/*.zip
340+ C D
341+
342+ Test that Updater successfully finds the target files metadata,
343+ traversing the delegations as expected.
344+ """
345+
346+ delegations_tree = DelegationsTestCase (
347+ delegations = [
348+ TestDelegation ("targets" , "A" , paths = ["*.doc" , "*.md" ]),
349+ TestDelegation ("targets" , "B" , paths = ["releases/*/*" ]),
350+ TestDelegation ("B" , "C" , paths = ["releases/x/*" ]),
351+ TestDelegation ("B" , "D" , paths = ["releases/y/*.zip" ]),
352+ ],
353+ target_files = [
354+ TestTarget ("targets" , b"targetfile content" , "targetfile" ),
355+ TestTarget ("A" , b"README by A" , "README.md" ),
356+ TestTarget ("C" , b"x release by C" , "releases/x/x_v1" ),
357+ TestTarget ("D" , b"y release by D" , "releases/y/y_v1.zip" ),
358+ TestTarget ("D" , b"z release by D" , "releases/z/z_v1.zip" ),
359+ ],
360+ )
361+
362+ def setUp (self ) -> None :
363+ super ().setUp ()
364+ self ._init_repo (self .delegations_tree )
365+
366+ # fmt: off
367+ targets : utils .DataSet = {
368+ "no delegations" :
369+ TargetTestCase ("targetfile" , True , []),
370+ "targetpath matches wildcard" :
371+ TargetTestCase ("README.md" , True , ["A" ]),
372+ "targetpath with separators x" :
373+ TargetTestCase ("releases/x/x_v1" , True , ["B" , "C" ]),
374+ "targetpath with separators y" :
375+ TargetTestCase ("releases/y/y_v1.zip" , True , ["B" , "D" ]),
376+ "targetpath is not delegated by all roles in the chain" :
377+ TargetTestCase ("releases/z/z_v1.zip" , False , ["B" ]),
378+ }
379+ # fmt: on
380+
381+ @utils .run_sub_tests_with_dataset (targets )
382+ def test_targetfile_search (self , test_data : TargetTestCase ) -> None :
383+ try :
384+ self .setup_subtest ()
385+ # targetpath, found, visited_order = test_data
386+ exp_files = [* TOP_LEVEL_ROLE_NAMES , * test_data .visited_order ]
387+ exp_calls = [(role , 1 ) for role in test_data .visited_order ]
388+ exp_target = self .sim .target_files [test_data .targetpath ].target_file
389+
390+ updater = self ._init_updater ()
391+ # Call explicitly refresh to simplify the expected_calls list
392+ updater .refresh ()
393+ self .sim .fetch_tracker .metadata .clear ()
394+ target = updater .get_targetinfo (test_data .targetpath )
395+ if target is not None :
396+ # Confirm that the expected TargetFile is found
397+ self .assertTrue (test_data .found )
398+ self .assertDictEqual (target .to_dict (), exp_target .to_dict ())
399+ else :
400+ self .assertFalse (test_data .found )
401+ # Check that the delegated roles were visited in the expected
402+ # order and the corresponding metadata files were persisted
403+ self .assertListEqual (self .sim .fetch_tracker .metadata , exp_calls )
255404 self ._assert_files_exist (exp_files )
256405 finally :
257406 self .teardown_subtest ()
258407
259408
260409if __name__ == "__main__" :
261410 if "--dump" in sys .argv :
262- TestDelegationsGraphs .dump_dir = tempfile .mkdtemp ()
263- print (f"Repository Simulator dumps in { TestDelegationsGraphs .dump_dir } " )
411+ TestDelegations .dump_dir = tempfile .mkdtemp ()
412+ print (f"Repository Simulator dumps in { TestDelegations .dump_dir } " )
264413 sys .argv .remove ("--dump" )
265414
266415 utils .configure_test_logging (sys .argv )
0 commit comments