Skip to content

Commit

Permalink
Merge pull request theupdateframework#1687 from MVrachev/address-mypy…
Browse files Browse the repository at this point in the history
…-warnings

Tests on the new implementation: address mypy warnings
  • Loading branch information
Jussi Kukkonen authored Nov 25, 2021
2 parents a24c4e9 + e2deff3 commit 600eb86
Show file tree
Hide file tree
Showing 13 changed files with 231 additions and 183 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ warn_unreachable = True
strict_equality = True
disallow_untyped_defs = True
disallow_untyped_calls = True
show_error_codes = True
files =
tuf/api/,
tuf/ngclient,
Expand Down
38 changes: 17 additions & 21 deletions tests/repository_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,7 @@ class RepositorySimulator(FetcherInterface):
"""Simulates a repository that can be used for testing."""

# pylint: disable=too-many-instance-attributes
def __init__(self):
self.md_root: Metadata[Root] = None
self.md_timestamp: Metadata[Timestamp] = None
self.md_snapshot: Metadata[Snapshot] = None
self.md_targets: Metadata[Targets] = None
def __init__(self) -> None:
self.md_delegates: Dict[str, Metadata[Targets]] = {}

# other metadata is signed on-demand (when fetched) but roots must be
Expand All @@ -117,7 +113,7 @@ def __init__(self):
# Enable hash-prefixed target file names
self.prefix_targets_with_hash = True

self.dump_dir = None
self.dump_dir: Optional[str] = None
self.dump_version = 0

now = datetime.utcnow()
Expand Down Expand Up @@ -152,12 +148,12 @@ def create_key() -> Tuple[Key, SSlibSigner]:
sslib_key = generate_ed25519_key()
return Key.from_securesystemslib_key(sslib_key), SSlibSigner(sslib_key)

def add_signer(self, role: str, signer: SSlibSigner):
def add_signer(self, role: str, signer: SSlibSigner) -> None:
if role not in self.signers:
self.signers[role] = {}
self.signers[role][signer.key_dict["keyid"]] = signer

def _initialize(self):
def _initialize(self) -> None:
"""Setup a minimal valid repository."""

targets = Targets(1, SPEC_VER, self.safe_expiry, {}, None)
Expand All @@ -182,7 +178,7 @@ def _initialize(self):
self.md_root = Metadata(root, OrderedDict())
self.publish_root()

def publish_root(self):
def publish_root(self) -> None:
"""Sign and store a new serialized version of root."""
self.md_root.signatures.clear()
for signer in self.signers["root"].values():
Expand All @@ -199,12 +195,12 @@ def fetch(self, url: str) -> Iterator[bytes]:
if path.startswith("/metadata/") and path.endswith(".json"):
# figure out rolename and version
ver_and_name = path[len("/metadata/") :][: -len(".json")]
version, _, role = ver_and_name.partition(".")
version_str, _, role = ver_and_name.partition(".")
# root is always version-prefixed while timestamp is always NOT
if role == "root" or (
self.root.consistent_snapshot and ver_and_name != "timestamp"
):
version = int(version)
version: Optional[int] = int(version_str)
else:
# the file is not version-prefixed
role = ver_and_name
Expand All @@ -216,11 +212,10 @@ def fetch(self, url: str) -> Iterator[bytes]:
target_path = path[len("/targets/") :]
dir_parts, sep, prefixed_filename = target_path.rpartition("/")
# extract the hash prefix, if any
prefix: Optional[str] = None
filename = prefixed_filename
if self.root.consistent_snapshot and self.prefix_targets_with_hash:
prefix, _, filename = prefixed_filename.partition(".")
else:
filename = prefixed_filename
prefix = None
target_path = f"{dir_parts}{sep}{filename}"

yield self._fetch_target(target_path, prefix)
Expand Down Expand Up @@ -261,8 +256,9 @@ def _fetch_metadata(
return self.signed_roots[version - 1]

# sign and serialize the requested metadata
md: Optional[Metadata]
if role == "timestamp":
md: Metadata = self.md_timestamp
md = self.md_timestamp
elif role == "snapshot":
md = self.md_snapshot
elif role == "targets":
Expand Down Expand Up @@ -294,7 +290,7 @@ def _compute_hashes_and_length(
hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()}
return hashes, len(data)

def update_timestamp(self):
def update_timestamp(self) -> None:
"""Update timestamp and assign snapshot version to snapshot_meta
version.
"""
Expand All @@ -307,7 +303,7 @@ def update_timestamp(self):

self.timestamp.version += 1

def update_snapshot(self):
def update_snapshot(self) -> None:
"""Update snapshot, assign targets versions and update timestamp."""
for role, delegate in self.all_targets():
hashes = None
Expand All @@ -322,7 +318,7 @@ def update_snapshot(self):
self.snapshot.version += 1
self.update_timestamp()

def add_target(self, role: str, data: bytes, path: str):
def add_target(self, role: str, data: bytes, path: str) -> None:
"""Create a target from data and add it to the target_files."""
if role == "targets":
targets = self.targets
Expand All @@ -341,7 +337,7 @@ def add_delegation(
terminating: bool,
paths: Optional[List[str]],
hash_prefixes: Optional[List[str]],
):
) -> None:
"""Add delegated target role to the repository."""
if delegator_name == "targets":
delegator = self.targets
Expand All @@ -351,7 +347,7 @@ def add_delegation(
# Create delegation
role = DelegatedRole(name, [], 1, terminating, paths, hash_prefixes)
if delegator.delegations is None:
delegator.delegations = Delegations({}, {})
delegator.delegations = Delegations({}, OrderedDict())
# put delegation last by default
delegator.delegations.roles[role.name] = role

Expand All @@ -363,7 +359,7 @@ def add_delegation(
# Add metadata for the role
self.md_delegates[role.name] = Metadata(targets, OrderedDict())

def write(self):
def write(self) -> None:
"""Dump current repository metadata to self.dump_dir
This is a debugging tool: dumping repository state before running
Expand Down
54 changes: 31 additions & 23 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import tempfile
import unittest
from datetime import datetime, timedelta
from typing import ClassVar, Dict

from dateutil.relativedelta import relativedelta
from securesystemslib import hash as sslib_hash
Expand All @@ -28,6 +29,7 @@
from tuf import exceptions
from tuf.api.metadata import (
DelegatedRole,
Delegations,
Key,
Metadata,
MetaFile,
Expand All @@ -47,8 +49,13 @@
class TestMetadata(unittest.TestCase):
"""Tests for public API of all classes in 'tuf/api/metadata.py'."""

temporary_directory: ClassVar[str]
repo_dir: ClassVar[str]
keystore_dir: ClassVar[str]
keystore: ClassVar[Dict[str, str]]

@classmethod
def setUpClass(cls):
def setUpClass(cls) -> None:
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in
# TearDownClass() so that temporary files are always removed, even when
Expand Down Expand Up @@ -78,12 +85,12 @@ def setUpClass(cls):
)

@classmethod
def tearDownClass(cls):
def tearDownClass(cls) -> None:
# Remove the temporary repository directory, which should contain all
# the metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)

def test_generic_read(self):
def test_generic_read(self) -> None:
for metadata, inner_metadata_cls in [
("root", Root),
("snapshot", Snapshot),
Expand Down Expand Up @@ -120,15 +127,15 @@ def test_generic_read(self):

os.remove(bad_metadata_path)

def test_compact_json(self):
def test_compact_json(self) -> None:
path = os.path.join(self.repo_dir, "metadata", "targets.json")
md_obj = Metadata.from_file(path)
self.assertTrue(
len(JSONSerializer(compact=True).serialize(md_obj))
< len(JSONSerializer().serialize(md_obj))
)

def test_read_write_read_compare(self):
def test_read_write_read_compare(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
md_obj = Metadata.from_file(path)
Expand All @@ -140,7 +147,7 @@ def test_read_write_read_compare(self):

os.remove(path_2)

def test_to_from_bytes(self):
def test_to_from_bytes(self) -> None:
for metadata in ["root", "snapshot", "timestamp", "targets"]:
path = os.path.join(self.repo_dir, "metadata", metadata + ".json")
with open(path, "rb") as f:
Expand All @@ -157,7 +164,7 @@ def test_to_from_bytes(self):
metadata_obj_2 = Metadata.from_bytes(obj_bytes)
self.assertEqual(metadata_obj_2.to_bytes(), obj_bytes)

def test_sign_verify(self):
def test_sign_verify(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed

Expand All @@ -183,7 +190,7 @@ def test_sign_verify(self):
# Test verifying with explicitly set serializer
targets_key.verify_signature(md_obj, CanonicalJSONSerializer())
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj, JSONSerializer())
targets_key.verify_signature(md_obj, JSONSerializer()) # type: ignore[arg-type]

sslib_signer = SSlibSigner(self.keystore["snapshot"])
# Append a new signature with the unrelated key and assert that ...
Expand All @@ -206,7 +213,7 @@ def test_sign_verify(self):
with self.assertRaises(exceptions.UnsignedMetadataError):
targets_key.verify_signature(md_obj)

def test_verify_failures(self):
def test_verify_failures(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path).signed

Expand Down Expand Up @@ -248,7 +255,7 @@ def test_verify_failures(self):
timestamp_key.verify_signature(md_obj)
sig.signature = correct_sig

def test_metadata_base(self):
def test_metadata_base(self) -> None:
# Use of Snapshot is arbitrary, we're just testing the base class
# features with real data
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
Expand Down Expand Up @@ -290,7 +297,7 @@ def test_metadata_base(self):
with self.assertRaises(ValueError):
Metadata.from_dict(data)

def test_metadata_snapshot(self):
def test_metadata_snapshot(self) -> None:
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
snapshot = Metadata[Snapshot].from_file(snapshot_path)

Expand All @@ -309,7 +316,7 @@ def test_metadata_snapshot(self):
snapshot.signed.meta["role1.json"].to_dict(), fileinfo.to_dict()
)

def test_metadata_timestamp(self):
def test_metadata_timestamp(self) -> None:
timestamp_path = os.path.join(
self.repo_dir, "metadata", "timestamp.json"
)
Expand Down Expand Up @@ -349,7 +356,7 @@ def test_metadata_timestamp(self):
timestamp.signed.snapshot_meta.to_dict(), fileinfo.to_dict()
)

def test_metadata_verify_delegate(self):
def test_metadata_verify_delegate(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)
snapshot_path = os.path.join(self.repo_dir, "metadata", "snapshot.json")
Expand Down Expand Up @@ -410,14 +417,14 @@ def test_metadata_verify_delegate(self):
snapshot.sign(SSlibSigner(self.keystore["timestamp"]), append=True)
root.verify_delegate("snapshot", snapshot)

def test_key_class(self):
def test_key_class(self) -> None:
# Test if from_securesystemslib_key removes the private key from keyval
# of a securesystemslib key dictionary.
sslib_key = generate_ed25519_key()
key = Key.from_securesystemslib_key(sslib_key)
self.assertFalse("private" in key.keyval.keys())

def test_root_add_key_and_remove_key(self):
def test_root_add_key_and_remove_key(self) -> None:
root_path = os.path.join(self.repo_dir, "metadata", "root.json")
root = Metadata[Root].from_file(root_path)

Expand Down Expand Up @@ -475,7 +482,7 @@ def test_root_add_key_and_remove_key(self):
with self.assertRaises(ValueError):
root.signed.remove_key("nosuchrole", keyid)

def test_is_target_in_pathpattern(self):
def test_is_target_in_pathpattern(self) -> None:
# pylint: disable=protected-access
supported_use_cases = [
("foo.tgz", "foo.tgz"),
Expand Down Expand Up @@ -507,7 +514,7 @@ def test_is_target_in_pathpattern(self):
DelegatedRole._is_target_in_pathpattern(targetpath, pathpattern)
)

def test_metadata_targets(self):
def test_metadata_targets(self) -> None:
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets = Metadata[Targets].from_file(targets_path)

Expand All @@ -531,7 +538,7 @@ def test_metadata_targets(self):
targets.signed.targets[filename].to_dict(), fileinfo.to_dict()
)

def test_targets_key_api(self):
def test_targets_key_api(self) -> None:
targets_path = os.path.join(self.repo_dir, "metadata", "targets.json")
targets: Targets = Metadata[Targets].from_file(targets_path).signed

Expand All @@ -545,6 +552,7 @@ def test_targets_key_api(self):
"threshold": 1,
}
)
assert isinstance(targets.delegations, Delegations)
targets.delegations.roles["role2"] = delegated_role

key_dict = {
Expand Down Expand Up @@ -608,7 +616,7 @@ def test_targets_key_api(self):
targets.remove_key("role1", key.keyid)
self.assertTrue(targets.delegations is None)

def test_length_and_hash_validation(self):
def test_length_and_hash_validation(self) -> None:

# Test metadata files' hash and length verification.
# Use timestamp to get a MetaFile object and snapshot
Expand Down Expand Up @@ -648,7 +656,7 @@ def test_length_and_hash_validation(self):

# Test wrong algorithm format (sslib.FormatError)
snapshot_metafile.hashes = {
256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab"
256: "8f88e2ba48b412c3843e9bb26e1b6f8fc9e98aceb0fbaa97ba37b4c98717d7ab" # type: ignore[dict-item]
}
with self.assertRaises(exceptions.LengthOrHashMismatchError):
snapshot_metafile.verify_length_and_hashes(data)
Expand Down Expand Up @@ -678,7 +686,7 @@ def test_length_and_hash_validation(self):
with self.assertRaises(exceptions.LengthOrHashMismatchError):
file1_targetfile.verify_length_and_hashes(file1)

def test_targetfile_from_file(self):
def test_targetfile_from_file(self) -> None:
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, "targets", "file1.txt")
targetfile_from_file = TargetFile.from_file(
Expand All @@ -700,7 +708,7 @@ def test_targetfile_from_file(self):
with self.assertRaises(exceptions.UnsupportedAlgorithmError):
TargetFile.from_file(file_path, file_path, ["123"])

def test_targetfile_from_data(self):
def test_targetfile_from_data(self) -> None:
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, "targets", "file1.txt")

Expand All @@ -714,7 +722,7 @@ def test_targetfile_from_data(self):
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)

def test_is_delegated_role(self):
def test_is_delegated_role(self) -> None:
# test path matches
# see more extensive tests in test_is_target_in_pathpattern()
for paths in [
Expand Down
Loading

0 comments on commit 600eb86

Please sign in to comment.