Skip to content

Commit 617e87e

Browse files
committed
Annotations and use extensively modify_metadata
Signed-off-by: Martin Vrachev <[email protected]>
1 parent 11531ca commit 617e87e

File tree

1 file changed

+88
-100
lines changed

1 file changed

+88
-100
lines changed

tests/test_trusted_metadata_set.py

Lines changed: 88 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,15 @@
66
from datetime import datetime
77

88
from tuf import exceptions
9-
from tuf.api.metadata import Metadata, Signed, Timestamp, Snapshot, MetaFile
9+
from tuf.api.metadata import (
10+
Metadata,
11+
Signed,
12+
Root,
13+
Timestamp,
14+
Snapshot,
15+
MetaFile,
16+
Targets
17+
)
1018
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
1119

1220
from securesystemslib.signer import SSlibSigner
@@ -21,6 +29,22 @@
2129

2230
class TestTrustedMetadataSet(unittest.TestCase):
2331

32+
def modify_metadata(
33+
self, rolename: str, modification_func: Callable[["Signed"], None]
34+
) -> bytes:
35+
"""Instantiate metadata from rolename type, call modification_func and
36+
sign it again with self.keystore[rolename] signer.
37+
38+
Attributes:
39+
rolename: A denoting the name of the metadata which will be modified.
40+
modification_func: Function that will be called to modify the signed
41+
portion of metadata bytes.
42+
"""
43+
metadata = Metadata.from_bytes(self.metadata[rolename])
44+
modification_func(metadata.signed)
45+
metadata.sign(self.keystore[rolename])
46+
return metadata.to_bytes()
47+
2448
@classmethod
2549
def setUpClass(cls):
2650
cls.repo_dir = os.path.join(
@@ -45,12 +69,20 @@ def setUpClass(cls):
4569
)
4670
cls.keystore[role] = SSlibSigner(key_dict)
4771

72+
def hashes_length_modifier(timestamp: Timestamp) -> None:
73+
timestamp.meta["snapshot.json"].hashes = None
74+
timestamp.meta["snapshot.json"].length = None
75+
76+
cls.metadata["timestamp"] = cls.modify_metadata(
77+
cls, "timestamp", hashes_length_modifier
78+
)
79+
4880
def setUp(self) -> None:
4981
self.trusted_set = TrustedMetadataSet(self.metadata["root"])
5082

5183
def _root_updated_and_update_timestamp(
5284
self, timestamp_bytes: Optional[bytes] = None
53-
):
85+
) -> None:
5486
"""Finsh root update and update timestamp with passed timestamp_bytes.
5587
5688
Args:
@@ -84,15 +116,6 @@ def _update_all_besides_targets(
84116
snapshot_bytes = snapshot_bytes or self.metadata["snapshot"]
85117
self.trusted_set.update_snapshot(snapshot_bytes)
86118

87-
def modify_metadata(
88-
self, rolename: str, modification_func: Callable[["Signed"], None]
89-
):
90-
"""Instantiate metadata from rolename type, call modification_func and
91-
sign it again with self.keystore[rolename] signer."""
92-
metadata = Metadata.from_bytes(self.metadata[rolename])
93-
modification_func(metadata.signed)
94-
metadata.sign(self.keystore[rolename])
95-
return metadata.to_bytes()
96119

97120
def test_update(self):
98121
self.trusted_set.root_update_finished()
@@ -177,10 +200,6 @@ def test_update_with_invalid_json(self):
177200
]
178201
for metadata, update_func in top_level_md:
179202
md = Metadata.from_bytes(metadata)
180-
if md.signed.type == "snapshot":
181-
# timestamp hashes and length intervene when testing snapshot
182-
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
183-
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
184203
# metadata is not json
185204
with self.assertRaises(exceptions.RepositoryError):
186205
update_func(b"")
@@ -210,26 +229,28 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self):
210229

211230

212231
def test_root_update_finished_expired(self):
213-
root = Metadata.from_bytes(self.metadata["root"])
214-
root.signed.expires = datetime(1970, 1, 1)
215-
root.sign(self.keystore["root"])
216-
tmp_trusted_set = TrustedMetadataSet(root.to_bytes())
232+
def root_expired_modifier(root: Root) -> None:
233+
root.expires = datetime(1970, 1, 1)
234+
235+
root = self.modify_metadata("root", root_expired_modifier)
236+
tmp_trusted_set = TrustedMetadataSet(root)
217237
# call root_update_finished when trusted root has expired
218238
with self.assertRaises(exceptions.ExpiredMetadataError):
219239
tmp_trusted_set.root_update_finished()
220240

221241

222242
def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self):
223243
# new_timestamp.version < trusted_timestamp.version
224-
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
225-
timestamp.signed.version = 3
226-
timestamp.sign(self.keystore["timestamp"])
227-
self._root_updated_and_update_timestamp(timestamp.to_bytes())
244+
def version_modifier(timestamp: Timestamp) -> None:
245+
timestamp.version = 3
246+
247+
timestamp = self.modify_metadata("timestamp", version_modifier)
248+
self._root_updated_and_update_timestamp(timestamp)
228249
with self.assertRaises(exceptions.ReplayedMetadataError):
229250
self.trusted_set.update_timestamp(self.metadata["timestamp"])
230251

231252
def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self):
232-
def version_modifier(timestamp: Timestamp):
253+
def version_modifier(timestamp: Timestamp) -> None:
233254
timestamp.version = 3
234255

235256
modified_timestamp = self.modify_metadata("timestamp", version_modifier)
@@ -241,142 +262,109 @@ def version_modifier(timestamp: Timestamp):
241262
def test_update_timestamp_expired(self):
242263
self.trusted_set.root_update_finished()
243264
# new_timestamp has expired
244-
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
245-
timestamp.signed.expires = datetime(1970, 1, 1)
246-
timestamp.sign(self.keystore["timestamp"])
265+
def timestamp_expired_modifier(timestamp: Timestamp) -> None:
266+
timestamp.expires = datetime(1970, 1, 1)
267+
268+
timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier)
247269
with self.assertRaises(exceptions.ExpiredMetadataError):
248-
self.trusted_set.update_timestamp(timestamp.to_bytes())
270+
self.trusted_set.update_timestamp(timestamp)
249271

250272

251273
def test_update_snapshot_cannot_verify_snapshot_with_threshold(self):
252-
def hashes_length_modifier(timestamp: Timestamp):
253-
timestamp.meta["snapshot.json"].hashes = None
254-
timestamp.meta["snapshot.json"].length = None
255-
256-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
257-
self._root_updated_and_update_timestamp(timestamp)
274+
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
258275
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
259276
snapshot.signatures.clear()
260277
with self.assertRaises(exceptions.UnsignedMetadataError):
261278
self.trusted_set.update_snapshot(snapshot.to_bytes())
262279

263280
def test_update_snapshot_version_different_timestamp_snapshot_version(self):
264-
def hashes_length_version_modifier(timestamp: Timestamp):
265-
timestamp.meta["snapshot.json"].hashes = None
266-
timestamp.meta["snapshot.json"].length = None
281+
def timestamp_version_modifier(timestamp: Timestamp) -> None:
267282
timestamp.meta["snapshot.json"].version = 2
268283

269-
timestamp = self.modify_metadata(
270-
"timestamp", hashes_length_version_modifier
271-
)
284+
timestamp = self.modify_metadata("timestamp", timestamp_version_modifier)
272285
self._root_updated_and_update_timestamp(timestamp)
273286
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
274-
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
275-
snapshot.signed.version = 3
276-
snapshot.sign(self.keystore["snapshot"])
287+
def snapshot_version_modifier(snapshot: Snapshot) -> None:
288+
snapshot.version = 3
289+
290+
snapshot = self.modify_metadata("snapshot", snapshot_version_modifier)
277291
with self.assertRaises(exceptions.BadVersionNumberError):
278-
self.trusted_set.update_snapshot(snapshot.to_bytes())
292+
self.trusted_set.update_snapshot(snapshot)
279293

280294
def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
281-
def hashes_length_modifier(timestamp: Timestamp):
282-
timestamp.meta["snapshot.json"].hashes = None
283-
timestamp.meta["snapshot.json"].length = None
284-
285-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
286-
self._update_all_besides_targets(timestamp)
295+
self._update_all_besides_targets(self.metadata["timestamp"])
287296
# Test removing a meta_file in new_snapshot compared to the old snapshot
288-
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
289-
snapshot.signed.meta = {}
290-
snapshot.sign(self.keystore["snapshot"])
297+
def no_meta_modifier(snapshot: Snapshot) -> None:
298+
snapshot.meta = {}
299+
300+
snapshot = self.modify_metadata("snapshot", no_meta_modifier)
291301
with self.assertRaises(exceptions.RepositoryError):
292-
self.trusted_set.update_snapshot(snapshot.to_bytes())
302+
self.trusted_set.update_snapshot(snapshot)
293303

294304
def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self):
295-
def hashes_length_modifier(timestamp: Timestamp):
296-
timestamp.meta["snapshot.json"].hashes = None
297-
timestamp.meta["snapshot.json"].length = None
298-
299-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
300-
self._root_updated_and_update_timestamp(timestamp)
305+
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
301306
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
302-
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
303-
for metafile_path in snapshot.signed.meta.keys():
304-
snapshot.signed.meta[metafile_path].version += 1
305-
snapshot.sign(self.keystore["snapshot"])
306-
self.trusted_set.update_snapshot(snapshot.to_bytes())
307+
def version_meta_modifier(snapshot: Snapshot) -> None:
308+
for metafile_path in snapshot.meta.keys():
309+
snapshot.meta[metafile_path].version += 1
310+
311+
snapshot = self.modify_metadata("snapshot", version_meta_modifier)
312+
self.trusted_set.update_snapshot(snapshot)
307313
with self.assertRaises(exceptions.BadVersionNumberError):
308314
self.trusted_set.update_snapshot(self.metadata["snapshot"])
309315

310316
def test_update_snapshot_expired_new_snapshot(self):
311-
def hashes_length_modifier(timestamp: Timestamp):
312-
timestamp.meta["snapshot.json"].hashes = None
313-
timestamp.meta["snapshot.json"].length = None
314-
315-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
316-
self._root_updated_and_update_timestamp(timestamp)
317+
self._root_updated_and_update_timestamp(self.metadata["timestamp"])
317318
# new_snapshot has expired
318-
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
319-
snapshot.signed.expires = datetime(1970, 1, 1)
320-
snapshot.sign(self.keystore["snapshot"])
319+
def snapshot_expired_modifier(snapshot: Snapshot) -> None:
320+
snapshot.expires = datetime(1970, 1, 1)
321+
322+
snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier)
321323
with self.assertRaises(exceptions.ExpiredMetadataError):
322-
self.trusted_set.update_snapshot(snapshot.to_bytes())
324+
self.trusted_set.update_snapshot(snapshot)
323325

324326

325327
def test_update_targets_no_meta_in_snapshot(self):
326-
def hashes_length_modifier(timestamp: Timestamp):
327-
timestamp.meta["snapshot.json"].hashes = None
328-
timestamp.meta["snapshot.json"].length = None
329-
330-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
331-
def no_meta_modifier(snapshot: Snapshot):
328+
def no_meta_modifier(snapshot: Snapshot) -> None:
332329
snapshot.meta = {}
333330

334331
snapshot = self.modify_metadata("snapshot", no_meta_modifier)
335-
self._update_all_besides_targets(timestamp, snapshot)
332+
self._update_all_besides_targets(self.metadata["timestamp"], snapshot)
336333
# remove meta information with information about targets from snapshot
337334
with self.assertRaises(exceptions.RepositoryError):
338335
self.trusted_set.update_targets(self.metadata["targets"])
339336

340337
def test_update_targets_hash_different_than_snapshot_meta_hash(self):
341-
def hashes_length_modifier(timestamp: Timestamp):
342-
timestamp.meta["snapshot.json"].hashes = None
343-
timestamp.meta["snapshot.json"].length = None
344-
345-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
346-
def meta_length_modifier(snapshot: Snapshot):
338+
def meta_length_modifier(snapshot: Snapshot) -> None:
347339
for metafile_path in snapshot.meta:
348340
snapshot.meta[metafile_path] = MetaFile(version=1, length=1)
349341

350342
snapshot = self.modify_metadata("snapshot", meta_length_modifier)
351-
self._update_all_besides_targets(timestamp, snapshot)
343+
self._update_all_besides_targets(self.metadata["timestamp"], snapshot)
352344
# observed_hash != stored hash in snapshot meta for targets
353345
with self.assertRaises(exceptions.RepositoryError):
354346
self.trusted_set.update_targets(self.metadata["targets"])
355347

356348
def test_update_targets_version_different_snapshot_meta_version(self):
357-
def hashes_length_modifier(timestamp: Timestamp):
358-
timestamp.meta["snapshot.json"].hashes = None
359-
timestamp.meta["snapshot.json"].length = None
360-
361-
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
362-
def meta_modifier(snapshot: Snapshot):
349+
def meta_modifier(snapshot: Snapshot) -> None:
363350
for metafile_path in snapshot.meta:
364351
snapshot.meta[metafile_path] = MetaFile(version=2)
365352

366353
snapshot = self.modify_metadata("snapshot", meta_modifier)
367-
self._update_all_besides_targets(timestamp, snapshot)
354+
self._update_all_besides_targets(self.metadata["timestamp"], snapshot)
368355
# new_delegate.signed.version != meta.version stored in snapshot
369356
with self.assertRaises(exceptions.BadVersionNumberError):
370357
self.trusted_set.update_targets(self.metadata["targets"])
371358

372359
def test_update_targets_expired_new_target(self):
373360
self._update_all_besides_targets()
374361
# new_delegated_target has expired
375-
targets = Metadata.from_bytes(self.metadata["targets"])
376-
targets.signed.expires = datetime(1970, 1, 1)
377-
targets.sign(self.keystore["targets"])
362+
def target_expired_modifier(target: Targets) -> None:
363+
target.expires = datetime(1970, 1, 1)
364+
365+
targets = self.modify_metadata("targets", target_expired_modifier)
378366
with self.assertRaises(exceptions.ExpiredMetadataError):
379-
self.trusted_set.update_targets(targets.to_bytes())
367+
self.trusted_set.update_targets(targets)
380368

381369
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
382370

0 commit comments

Comments
 (0)