Skip to content

Commit 85dcbc8

Browse files
committed
Add test_updater_deleagation_graphs.py
Add tests creating delegations graphs with different complexity and successfully updating the delegated roles metadata. Signed-off-by: Teodora Sechkova <[email protected]>
1 parent b5e83a1 commit 85dcbc8

File tree

1 file changed

+218
-0
lines changed

1 file changed

+218
-0
lines changed
+218
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
#!/usr/bin/env python
2+
3+
# Copyright 2021, New York University and the TUF contributors
4+
# SPDX-License-Identifier: MIT OR Apache-2.0
5+
6+
"""Test updating delegated targets roles with various
7+
delegation hierarchies"""
8+
9+
import os
10+
import sys
11+
import tempfile
12+
import unittest
13+
from dataclasses import dataclass
14+
from typing import Any, Dict, Iterable, List, Optional
15+
from unittest.mock import call, patch
16+
17+
from tests import utils
18+
from tests.repository_simulator import RepositorySimulator
19+
from tuf.api.metadata import (
20+
SPECIFICATION_VERSION,
21+
TOP_LEVEL_ROLE_NAMES,
22+
DelegatedRole,
23+
Targets,
24+
)
25+
from tuf.ngclient import Updater
26+
27+
28+
@dataclass
29+
class Delegation:
30+
def __init__(
31+
self,
32+
delegator: str,
33+
rolename: str,
34+
keyids: Optional[List[str]] = None,
35+
threshold: int = 1,
36+
terminating: bool = False,
37+
paths: Optional[List[str]] = ["*"],
38+
path_hash_prefixes: Optional[List[str]] = None,
39+
):
40+
self.delegator = delegator
41+
keyids = keyids or []
42+
self.role = DelegatedRole(
43+
rolename, keyids, threshold, terminating, paths, path_hash_prefixes
44+
)
45+
46+
47+
class TestDelegationsGraphs(unittest.TestCase):
48+
"""Test creating delegations graphs with different complexity
49+
and successfully updating the delegated roles metadata"""
50+
51+
def setUp(self) -> None:
52+
self.temp_dir = tempfile.TemporaryDirectory()
53+
self.metadata_dir = os.path.join(self.temp_dir.name, "metadata")
54+
self.targets_dir = os.path.join(self.temp_dir.name, "targets")
55+
os.mkdir(self.metadata_dir)
56+
os.mkdir(self.targets_dir)
57+
58+
def tearDown(self) -> None:
59+
self.temp_dir.cleanup()
60+
61+
def _init_updater(self, sim: RepositorySimulator) -> Updater:
62+
"""Create a new Updater instance"""
63+
return Updater(
64+
self.metadata_dir,
65+
"https://example.com/metadata/",
66+
self.targets_dir,
67+
"https://example.com/targets/",
68+
sim,
69+
)
70+
71+
def _init_repo(self, delegations: List[Delegation]) -> RepositorySimulator:
72+
"""Create a new RepositorySimulator instance with 'delegations'"""
73+
sim = RepositorySimulator()
74+
spec_version = ".".join(SPECIFICATION_VERSION)
75+
76+
for delegation in delegations:
77+
if delegation.role.name in sim.md_delegates:
78+
targets = sim.md_delegates[delegation.role.name].signed
79+
else:
80+
targets = Targets(1, spec_version, sim.safe_expiry, {}, None)
81+
sim.add_delegation(delegation.delegator, delegation.role, targets)
82+
sim.update_snapshot()
83+
84+
# Init trusted root for Updater
85+
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
86+
root = sim.download_bytes(
87+
"https://example.com/metadata/1.root.json", 100000
88+
)
89+
f.write(root)
90+
91+
return sim
92+
93+
def _assert_files_exist(self, roles: Iterable[str]) -> None:
94+
"""Assert that local metadata files exist for 'roles'"""
95+
expected_files = sorted([f"{role}.json" for role in roles])
96+
local_metadata_files = sorted(os.listdir(self.metadata_dir))
97+
self.assertListEqual(local_metadata_files, expected_files)
98+
99+
graphs: utils.DataSet = {
100+
"single delegation": {
101+
"delegations": [Delegation("targets", "A")],
102+
"visited roles": ["A"],
103+
},
104+
"1-level tree": {
105+
"delegations": [
106+
Delegation("targets", "A"),
107+
Delegation("targets", "B"),
108+
],
109+
"visited roles": ["A", "B"],
110+
},
111+
"2-level tree-1": {
112+
"delegations": [
113+
Delegation("targets", "A"),
114+
Delegation("targets", "B"),
115+
Delegation("B", "C"),
116+
],
117+
"visited roles": ["A", "B", "C"],
118+
},
119+
"2-level tree-2": {
120+
"delegations": [
121+
Delegation("targets", "A"),
122+
Delegation("targets", "B"),
123+
Delegation("A", "C"),
124+
Delegation("A", "D"),
125+
],
126+
"visited roles": ["A", "C", "D", "B"],
127+
},
128+
"2-level tree terminating": {
129+
"delegations": [
130+
Delegation("targets", "A"),
131+
Delegation("targets", "B"),
132+
Delegation("A", "C", terminating=True),
133+
Delegation("A", "D"),
134+
],
135+
"visited roles": ["A", "C"],
136+
},
137+
"3-level tree": {
138+
"delegations": [
139+
Delegation("targets", "A"),
140+
Delegation("targets", "B"),
141+
Delegation("A", "C"),
142+
Delegation("C", "D"),
143+
],
144+
"visited roles": ["A", "C", "D", "B"],
145+
},
146+
"3-level tree terminating": {
147+
"delegations": [
148+
Delegation("targets", "A"),
149+
Delegation("targets", "B"),
150+
Delegation("A", "C", terminating=True),
151+
Delegation("C", "D"),
152+
],
153+
"visited roles": ["A", "C", "D"],
154+
},
155+
"cyclic graph": {
156+
"delegations": [
157+
Delegation("targets", "A"),
158+
Delegation("targets", "B"),
159+
Delegation("B", "C"),
160+
Delegation("C", "D"),
161+
Delegation("D", "B"),
162+
],
163+
"visited roles": ["A", "B", "C", "D"],
164+
},
165+
"2-level tree pathpattern": {
166+
"delegations": [
167+
Delegation("targets", "A", paths=["*.py"]),
168+
Delegation("targets", "B"),
169+
Delegation("A", "C"),
170+
Delegation("A", "D"),
171+
],
172+
"visited roles": ["B"],
173+
},
174+
"3-level tree pathpattern": {
175+
"delegations": [
176+
Delegation("targets", "A"),
177+
Delegation("targets", "B"),
178+
Delegation("A", "C", paths=["*.py"]),
179+
Delegation("C", "D"),
180+
],
181+
"visited roles": ["A", "B"],
182+
},
183+
}
184+
185+
@utils.run_sub_tests_with_dataset(graphs)
186+
def test_graph_traversal(
187+
self, test_case_data: Dict[str, List[Any]]
188+
) -> None:
189+
delegations: List[Delegation] = test_case_data["delegations"]
190+
visited_roles: List[str] = test_case_data["visited roles"]
191+
expected_files = [*TOP_LEVEL_ROLE_NAMES, *visited_roles]
192+
expected_calls = [call(role, 1) for role in visited_roles]
193+
194+
sim = self._init_repo(delegations)
195+
updater = self._init_updater(sim)
196+
# Call explicitly refresh to simplify the expected_calls list
197+
updater.refresh()
198+
# Check that metadata dir contains only top-level roles
199+
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
200+
201+
with patch.object(
202+
sim, "_fetch_metadata", wraps=sim._fetch_metadata
203+
) as wrapped_fetch:
204+
# Looking for a non-existing targetpath forces updater
205+
# to visit all possible delegated roles
206+
targetfile = updater.get_targetinfo("missingpath")
207+
208+
self.assertIsNone(targetfile)
209+
self.assertListEqual(wrapped_fetch.call_args_list, expected_calls)
210+
self._assert_files_exist(expected_files)
211+
212+
utils.cleanup_dir(self.metadata_dir)
213+
214+
215+
if __name__ == "__main__":
216+
217+
utils.configure_test_logging(sys.argv)
218+
unittest.main()

0 commit comments

Comments
 (0)