From 9c2bf6e784914ee11dc82e3f42aa00369bb57f4a Mon Sep 17 00:00:00 2001 From: Ivana Atanasova Date: Tue, 16 Nov 2021 17:24:21 +0200 Subject: [PATCH] Update ngclient to return loaded metadata This changes `TrustedMetadataSet` to return new trusted Metadata on successful calls of the `update_` functions and also changes `Updater._load_targets` to return loaded metadata as well Signed-off-by: Ivana Atanasova --- tests/test_trusted_metadata_set.py | 16 +++++++++ .../_internal/trusted_metadata_set.py | 35 +++++++++++++++---- tuf/ngclient/updater.py | 24 ++++++++----- 3 files changed, 60 insertions(+), 15 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index bdc2c78cc3..9452c9bff6 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -129,6 +129,22 @@ def test_update(self): self.assertTrue(count, 6) + def test_update_metadata_output(self): + timestamp = self.trusted_set.update_timestamp(self.metadata["timestamp"]) + snapshot = self.trusted_set.update_snapshot(self.metadata["snapshot"]) + targets = self.trusted_set.update_targets(self.metadata["targets"]) + delegeted_targets_1 = self.trusted_set.update_delegated_targets( + self.metadata["role1"], "role1", "targets" + ) + delegeted_targets_2 = self.trusted_set.update_delegated_targets( + self.metadata["role2"], "role2", "role1" + ) + self.assertIsInstance(timestamp.signed, Timestamp) + self.assertIsInstance(snapshot.signed, Snapshot) + self.assertIsInstance(targets.signed, Targets) + self.assertIsInstance(delegeted_targets_1.signed, Targets) + self.assertIsInstance(delegeted_targets_2.signed, Targets) + def test_out_of_order_ops(self): # Update snapshot before timestamp with self.assertRaises(RuntimeError): diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index 59fe32a8f6..b7c831158c 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -141,7 +141,7 @@ def targets(self) -> Optional[Metadata[Targets]]: return self._trusted_set.get("targets") # Methods for updating metadata - def update_root(self, data: bytes) -> None: + def update_root(self, data: bytes) -> Metadata[Root]: """Verifies and loads 'data' as new root metadata. Note that an expired intermediate root is considered valid: expiry is @@ -153,6 +153,9 @@ def update_root(self, data: bytes) -> None: Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. + + Returns: + Deserialized and verified root Metadata object """ if self.timestamp is not None: raise RuntimeError("Cannot update root after timestamp") @@ -182,7 +185,9 @@ def update_root(self, data: bytes) -> None: self._trusted_set["root"] = new_root logger.info("Updated root v%d", new_root.signed.version) - def update_timestamp(self, data: bytes) -> None: + return new_root + + def update_timestamp(self, data: bytes) -> Metadata[Timestamp]: """Verifies and loads 'data' as new timestamp metadata. Note that an intermediate timestamp is allowed to be expired: @@ -199,6 +204,9 @@ def update_timestamp(self, data: bytes) -> None: RepositoryError: Metadata failed to load or verify as final timestamp. The actual error type and content will contain more details. + + Returns: + Deserialized and verified timestamp Metadata object """ if self.snapshot is not None: raise RuntimeError("Cannot update timestamp after snapshot") @@ -251,6 +259,8 @@ def update_timestamp(self, data: bytes) -> None: # timestamp is loaded: raise if it is not valid _final_ timestamp self._check_final_timestamp() + return new_timestamp + def _check_final_timestamp(self) -> None: """Raise if timestamp is expired""" @@ -260,7 +270,7 @@ def _check_final_timestamp(self) -> None: def update_snapshot( self, data: bytes, trusted: Optional[bool] = False - ) -> None: + ) -> Metadata[Snapshot]: """Verifies and loads 'data' as new snapshot metadata. Note that an intermediate snapshot is allowed to be expired and version @@ -282,6 +292,9 @@ def update_snapshot( Raises: RepositoryError: data failed to load or verify as final snapshot. The actual error type and content will contain more details. + + Returns: + Deserialized and verified snapshot Metadata object """ if self.timestamp is None: @@ -347,6 +360,8 @@ def update_snapshot( # snapshot is loaded, but we raise if it's not valid _final_ snapshot self._check_final_snapshot() + return new_snapshot + def _check_final_snapshot(self) -> None: """Raise if snapshot is expired or meta version does not match""" @@ -361,7 +376,7 @@ def _check_final_snapshot(self) -> None: f"got {self.snapshot.signed.version}" ) - def update_targets(self, data: bytes) -> None: + def update_targets(self, data: bytes) -> Metadata[Targets]: """Verifies and loads 'data' as new top-level targets metadata. Args: @@ -370,12 +385,15 @@ def update_targets(self, data: bytes) -> None: Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. + + Returns: + Deserialized and verified targets Metadata object """ - self.update_delegated_targets(data, "targets", "root") + return self.update_delegated_targets(data, "targets", "root") def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str - ) -> None: + ) -> Metadata[Targets]: """Verifies and loads 'data' as new metadata for target 'role_name'. Args: @@ -386,6 +404,9 @@ def update_delegated_targets( Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. + + Returns: + Deserialized and verified targets Metadata object """ if self.snapshot is None: raise RuntimeError("Cannot load targets before snapshot") @@ -438,6 +459,8 @@ def update_delegated_targets( self._trusted_set[role_name] = new_delegate logger.info("Updated %s v%d", role_name, version) + return new_delegate + def _load_trusted_root(self, data: bytes) -> None: """Verifies and loads 'data' as trusted root metadata. diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index fb651a2ad5..87269a7d82 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -68,7 +68,7 @@ from securesystemslib import util as sslib_util from tuf import exceptions -from tuf.api.metadata import TargetFile, Targets +from tuf.api.metadata import Metadata, TargetFile, Targets from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set from tuf.ngclient.config import UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface @@ -368,12 +368,15 @@ def _load_snapshot(self) -> None: self._trusted_set.update_snapshot(data) self._persist_metadata("snapshot", data) - def _load_targets(self, role: str, parent_role: str) -> None: + def _load_targets(self, role: str, parent_role: str) -> Metadata[Targets]: """Load local (and if needed remote) metadata for 'role'.""" try: data = self._load_local_metadata(role) - self._trusted_set.update_delegated_targets(data, role, parent_role) + delegated_targets = self._trusted_set.update_delegated_targets( + data, role, parent_role + ) logger.debug("Local %s is valid: not downloading new one", role) + return delegated_targets except (OSError, exceptions.RepositoryError) as e: # Local 'role' does not exist or is invalid: update from remote logger.debug("Failed to load local %s: %s", role, e) @@ -386,9 +389,13 @@ def _load_targets(self, role: str, parent_role: str) -> None: version = metainfo.version data = self._download_metadata(role, length, version) - self._trusted_set.update_delegated_targets(data, role, parent_role) + delegated_targets = self._trusted_set.update_delegated_targets( + data, role, parent_role + ) self._persist_metadata(role, data) + return delegated_targets + def _preorder_depth_first_walk( self, target_filepath: str ) -> Optional[TargetFile]: @@ -417,10 +424,9 @@ def _preorder_depth_first_walk( # The metadata for 'role_name' must be downloaded/updated before # its targets, delegations, and child roles can be inspected. - self._load_targets(role_name, parent_role) + targets = self._load_targets(role_name, parent_role).signed - role_metadata: Targets = self._trusted_set[role_name].signed - target = role_metadata.targets.get(target_filepath) + target = targets.targets.get(target_filepath) if target is not None: logger.debug("Found target in current role %s", role_name) @@ -432,11 +438,11 @@ def _preorder_depth_first_walk( # And also decrement number of visited roles. number_of_delegations -= 1 - if role_metadata.delegations is not None: + if targets.delegations is not None: child_roles_to_visit = [] # NOTE: This may be a slow operation if there are many # delegated roles. - for child_role in role_metadata.delegations.roles.values(): + for child_role in targets.delegations.roles.values(): if child_role.is_delegated_path(target_filepath): logger.debug("Adding child role %s", child_role.name)