Skip to content

Cache all root metadata versions #2767

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 20, 2025
Merged
33 changes: 25 additions & 8 deletions examples/client/client
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import sys
import traceback
from hashlib import sha256
from pathlib import Path
from urllib import request

import urllib3

from tuf.api.exceptions import DownloadError, RepositoryError
from tuf.ngclient import Updater
Expand All @@ -29,19 +30,30 @@ def build_metadata_dir(base_url: str) -> str:

def init_tofu(base_url: str) -> bool:
"""Initialize local trusted metadata (Trust-On-First-Use) and create a
directory for downloads"""
directory for downloads

NOTE: This is unsafe and for demonstration only: the bootstrap root
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we log a warning to the user here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that would be helpful:

  • warnings are useful if they allow the user to react somehow. This is not the case here
  • the note is there to prevent someone from copy-pasting example code to a real client that should be shipping an embedded root instead

should be deployed alongside your updater application
"""

metadata_dir = build_metadata_dir(base_url)

if not os.path.isdir(metadata_dir):
os.makedirs(metadata_dir)

root_url = f"{base_url}/metadata/1.root.json"
try:
request.urlretrieve(root_url, f"{metadata_dir}/root.json")
except OSError:
print(f"Failed to download initial root from {root_url}")
response = urllib3.request("GET", f"{base_url}/metadata/1.root.json")
if response.status != 200:
print(f"Failed to download initial root {base_url}/metadata/1.root.json")
return False

Updater(
metadata_dir=metadata_dir,
metadata_base_url=f"{base_url}/metadata/",
target_base_url=f"{base_url}/targets/",
target_dir=DOWNLOAD_DIR,
bootstrap=response.data,
)

print(f"Trust-on-First-Use: Initialized new root in {metadata_dir}")
return True

Expand Down Expand Up @@ -73,6 +85,9 @@ def download(base_url: str, target: str) -> bool:
os.mkdir(DOWNLOAD_DIR)

try:
# NOTE: initial root should be provided with ``bootstrap`` argument:
# This examples uses unsafe Trust-On-First-Use initialization so it is
# not possible here.
updater = Updater(
metadata_dir=metadata_dir,
metadata_base_url=f"{base_url}/metadata/",
Expand Down Expand Up @@ -104,7 +119,7 @@ def download(base_url: str, target: str) -> bool:
return True


def main() -> None:
def main() -> str | None:
"""Main TUF Client Example function"""

client_args = argparse.ArgumentParser(description="TUF Client Example")
Expand Down Expand Up @@ -169,6 +184,8 @@ def main() -> None:
else:
client_args.print_help()

return None


if __name__ == "__main__":
sys.exit(main())
2 changes: 1 addition & 1 deletion tests/test_updater_consistent_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def teardown_subtest(self) -> None:
if self.dump_dir is not None:
self.sim.write()

utils.cleanup_dir(self.metadata_dir)
utils.cleanup_metadata_dir(self.metadata_dir)

def _init_repo(
self, consistent_snapshot: bool, prefix_targets: bool = True
Expand Down
2 changes: 1 addition & 1 deletion tests/test_updater_delegation_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def setup_subtest(self) -> None:
self.sim.write()

def teardown_subtest(self) -> None:
utils.cleanup_dir(self.metadata_dir)
utils.cleanup_metadata_dir(self.metadata_dir)

def _init_repo(self, test_case: DelegationsTestCase) -> None:
"""Create a new RepositorySimulator instance and
Expand Down
1 change: 1 addition & 0 deletions tests/test_updater_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sys
import tempfile
import unittest
from collections.abc import Iterable
from typing import TYPE_CHECKING, Callable, ClassVar
from unittest.mock import MagicMock, patch

Expand Down
147 changes: 107 additions & 40 deletions tests/test_updater_top_level_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@ def setUp(self) -> None:

self.sim = RepositorySimulator()

# boostrap client with initial root metadata
with open(os.path.join(self.metadata_dir, "root.json"), "bw") as f:
f.write(self.sim.signed_roots[0])

if self.dump_dir is not None:
# create test specific dump directory
name = self.id().split(".")[-1]
Expand All @@ -75,22 +71,13 @@ def setUp(self) -> None:
def tearDown(self) -> None:
self.temp_dir.cleanup()

def _run_refresh(self) -> Updater:
def _run_refresh(self, skip_bootstrap: bool = False) -> Updater:
"""Create a new Updater instance and refresh"""
if self.dump_dir is not None:
self.sim.write()

updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)
updater = self._init_updater(skip_bootstrap)
updater.refresh()
return updater

def _init_updater(self) -> Updater:
def _init_updater(self, skip_bootstrap: bool = False) -> Updater:
"""Create a new Updater instance"""
if self.dump_dir is not None:
self.sim.write()
Expand All @@ -101,6 +88,7 @@ def _init_updater(self) -> Updater:
self.targets_dir,
"https://example.com/targets/",
self.sim,
bootstrap=None if skip_bootstrap else self.sim.signed_roots[0],
)

def _assert_files_exist(self, roles: Iterable[str]) -> None:
Expand All @@ -126,9 +114,6 @@ def _assert_version_equals(self, role: str, expected_version: int) -> None:
self.assertEqual(md.signed.version, expected_version)

def test_first_time_refresh(self) -> None:
# Metadata dir contains only the mandatory initial root.json
self._assert_files_exist([Root.type])

# Add one more root version to repository so that
# refresh() updates from local trusted root (v1) to
# remote root (v2)
Expand All @@ -142,10 +127,11 @@ def test_first_time_refresh(self) -> None:
version = 2 if role == Root.type else None
self._assert_content_equals(role, version)

def test_trusted_root_missing(self) -> None:
os.remove(os.path.join(self.metadata_dir, "root.json"))
def test_cached_root_missing_without_bootstrap(self) -> None:
# Run update without a bootstrap, with empty cache: this fails since there is no
# trusted root
with self.assertRaises(OSError):
self._run_refresh()
self._run_refresh(skip_bootstrap=True)

# Metadata dir is empty
self.assertFalse(os.listdir(self.metadata_dir))
Expand Down Expand Up @@ -178,15 +164,15 @@ def test_trusted_root_expired(self) -> None:
self._assert_files_exist(TOP_LEVEL_ROLE_NAMES)
self._assert_content_equals(Root.type, 3)

def test_trusted_root_unsigned(self) -> None:
# Local trusted root is not signed
def test_trusted_root_unsigned_without_bootstrap(self) -> None:
# Cached root is not signed, bootstrap root is not used
root_path = os.path.join(self.metadata_dir, "root.json")
md_root = Metadata.from_file(root_path)
md_root = Metadata.from_bytes(self.sim.signed_roots[0])
md_root.signatures.clear()
md_root.to_file(root_path)

with self.assertRaises(UnsignedMetadataError):
self._run_refresh()
self._run_refresh(skip_bootstrap=True)

# The update failed, no changes in metadata
self._assert_files_exist([Root.type])
Expand All @@ -204,10 +190,7 @@ def test_max_root_rotations(self) -> None:
self.sim.root.version += 1
self.sim.publish_root()

md_root = Metadata.from_file(
os.path.join(self.metadata_dir, "root.json")
)
initial_root_version = md_root.signed.version
initial_root_version = 1

updater.refresh()

Expand Down Expand Up @@ -712,26 +695,20 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
updater = self._run_refresh()
updater.get_targetinfo("non_existent_target")

# Clean up calls to open during refresh()
# Clear statistics for open() calls and metadata requests
wrapped_open.reset_mock()
# Clean up fetch tracker metadata
self.sim.fetch_tracker.metadata.clear()

# Create a new updater and perform a second update while
# the metadata is already stored in cache (metadata dir)
updater = Updater(
self.metadata_dir,
"https://example.com/metadata/",
self.targets_dir,
"https://example.com/targets/",
self.sim,
)
updater = self._init_updater()
updater.get_targetinfo("non_existent_target")

# Test that metadata is loaded from cache and not downloaded
root_dir = os.path.join(self.metadata_dir, "root_history")
wrapped_open.assert_has_calls(
[
call(os.path.join(self.metadata_dir, "root.json"), "rb"),
call(os.path.join(root_dir, "2.root.json"), "rb"),
call(os.path.join(self.metadata_dir, "timestamp.json"), "rb"),
call(os.path.join(self.metadata_dir, "snapshot.json"), "rb"),
call(os.path.join(self.metadata_dir, "targets.json"), "rb"),
Expand All @@ -742,6 +719,96 @@ def test_load_metadata_from_cache(self, wrapped_open: MagicMock) -> None:
expected_calls = [("root", 2), ("timestamp", None)]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

@patch.object(builtins, "open", wraps=builtins.open)
def test_intermediate_root_cache(self, wrapped_open: MagicMock) -> None:
"""Test that refresh uses the intermediate roots from cache"""
# Add root versions 2, 3
self.sim.root.version += 1
self.sim.publish_root()
self.sim.root.version += 1
self.sim.publish_root()

# Make a successful update of valid metadata which stores it in cache
self._run_refresh()

# assert that cache lookups happened but data was downloaded from remote
root_dir = os.path.join(self.metadata_dir, "root_history")
wrapped_open.assert_has_calls(
[
call(os.path.join(root_dir, "2.root.json"), "rb"),
call(os.path.join(root_dir, "3.root.json"), "rb"),
call(os.path.join(root_dir, "4.root.json"), "rb"),
call(os.path.join(self.metadata_dir, "timestamp.json"), "rb"),
call(os.path.join(self.metadata_dir, "snapshot.json"), "rb"),
call(os.path.join(self.metadata_dir, "targets.json"), "rb"),
]
)
expected_calls = [
("root", 2),
("root", 3),
("root", 4),
("timestamp", None),
("snapshot", 1),
("targets", 1),
]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

# Clear statistics for open() calls and metadata requests
wrapped_open.reset_mock()
self.sim.fetch_tracker.metadata.clear()

# Run update again, assert that metadata from cache was used (including intermediate roots)
self._run_refresh()
wrapped_open.assert_has_calls(
[
call(os.path.join(root_dir, "2.root.json"), "rb"),
call(os.path.join(root_dir, "3.root.json"), "rb"),
call(os.path.join(root_dir, "4.root.json"), "rb"),
call(os.path.join(self.metadata_dir, "timestamp.json"), "rb"),
call(os.path.join(self.metadata_dir, "snapshot.json"), "rb"),
call(os.path.join(self.metadata_dir, "targets.json"), "rb"),
]
)
expected_calls = [("root", 4), ("timestamp", None)]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

def test_intermediate_root_cache_poisoning(self) -> None:
"""Test that refresh works as expected when intermediate roots in cache are poisoned"""
# Add root versions 2, 3
self.sim.root.version += 1
self.sim.publish_root()
self.sim.root.version += 1
self.sim.publish_root()

# Make a successful update of valid metadata which stores it in cache
self._run_refresh()

# Modify cached intermediate root v2 so that it's no longer signed correctly
root_path = os.path.join(
self.metadata_dir, "root_history", "2.root.json"
)
md = Metadata.from_file(root_path)
md.signatures.clear()
md.to_file(root_path)

# Clear statistics for metadata requests
self.sim.fetch_tracker.metadata.clear()

# Update again, assert that intermediate root v2 was downloaded again
self._run_refresh()

expected_calls = [("root", 2), ("root", 4), ("timestamp", None)]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

# Clear statistics for metadata requests
self.sim.fetch_tracker.metadata.clear()

# Update again, this time assert that intermediate root v2 was used from cache
self._run_refresh()

expected_calls = [("root", 4), ("timestamp", None)]
self.assertListEqual(self.sim.fetch_tracker.metadata, expected_calls)

def test_expired_metadata(self) -> None:
"""Verifies that expired local timestamp/snapshot can be used for
updating from remote.
Expand Down
16 changes: 10 additions & 6 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,16 @@ def configure_test_logging(argv: list[str]) -> None:
logging.basicConfig(level=loglevel)


def cleanup_dir(path: str) -> None:
"""Delete all files inside a directory"""
for filepath in [
os.path.join(path, filename) for filename in os.listdir(path)
]:
os.remove(filepath)
def cleanup_metadata_dir(path: str) -> None:
"""Delete the local metadata dir"""
with os.scandir(path) as it:
for entry in it:
if entry.name == "root_history":
cleanup_metadata_dir(entry.path)
elif entry.name.endswith(".json"):
os.remove(entry.path)
else:
raise ValueError(f"Unexpected local metadata file {entry.path}")


class TestServerProcess:
Expand Down
Loading