From 2faa891ee41dd3afef10eedb49bbe29f39a4671c Mon Sep 17 00:00:00 2001 From: Terje Kvernes Date: Fri, 14 Jun 2024 23:05:33 +0200 Subject: [PATCH] Initial commit of S3 support for Stratum1. - The tests.py tests against the EESSI sync server and works as expected. - However, tests are not completed, so coverage is lacking. --- README.md | 6 ++ cvmfsscraper/__init__.py | 7 +++ cvmfsscraper/exceptions.py | 6 +- cvmfsscraper/http_get_models.py | 5 +- cvmfsscraper/repository.py | 30 +++++++--- cvmfsscraper/server.py | 84 ++++++++++++++++++++++----- cvmfsscraper/tests/base.py | 6 +- cvmfsscraper/tests/test_002_models.py | 12 ++-- test.py | 23 ++++---- 9 files changed, 136 insertions(+), 43 deletions(-) mode change 100644 => 100755 test.py diff --git a/README.md b/README.md index c2d9945..397b442 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,12 @@ for repo in servers[0].repositories: print("Last snapshot: " + str(repo.last_snapshot)) ```` +Note that if you are using a Stratum1 server with S3 as its backend, you need to set repos explicitly. +This is because the S3 backend does not have a `cvmfs/info/v1/repositories.json` file. Also, the GeoAPI +status will be `NOT_FOUND` for these servers. + +````python + # Data structure ## Server diff --git a/cvmfsscraper/__init__.py b/cvmfsscraper/__init__.py index 5ff5d85..e7472f0 100644 --- a/cvmfsscraper/__init__.py +++ b/cvmfsscraper/__init__.py @@ -14,6 +14,13 @@ structlog.processors.add_log_level, structlog.processors.StackInfoRenderer(), structlog.dev.set_exc_info, + structlog.processors.CallsiteParameterAdder( + [ + structlog.processors.CallsiteParameter.FILENAME, + structlog.processors.CallsiteParameter.FUNC_NAME, + structlog.processors.CallsiteParameter.LINENO, + ], + ), structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False), structlog.processors.JSONRenderer(), ], diff --git a/cvmfsscraper/exceptions.py b/cvmfsscraper/exceptions.py index d30a001..b4e3210 100644 --- a/cvmfsscraper/exceptions.py +++ b/cvmfsscraper/exceptions.py @@ -1,6 +1,6 @@ """Exceptions for cvmfsscraper.""" -from typing import Any +from typing import Any, Union import structlog @@ -10,7 +10,9 @@ class CVMFSScraperBaseException(Exception): """Base exception for cvmfsscraper.""" - def __init__(self, message: str, original_excption: Exception = None, *args: Any) -> None: + def __init__( + self, message: str, original_excption: Union[Exception, None] = None, *args: Any + ) -> None: """Initialize the exception.""" self.message = message self.original_exception = original_excption diff --git a/cvmfsscraper/http_get_models.py b/cvmfsscraper/http_get_models.py index 952cdcf..5ef4a54 100644 --- a/cvmfsscraper/http_get_models.py +++ b/cvmfsscraper/http_get_models.py @@ -309,5 +309,8 @@ def __init__(self, path: str, model_class: Type[BaseModel]) -> None: # Dynamically creating this list based on the Endpoints enum values # is not supported by mypy et al, so we have to do it manually. EndpointClassesType = Union[ - GetCVMFSPublished, GetCVMFSRepositoriesJSON, GetCVMFSStatusJSON, GetGeoAPI + GetCVMFSPublished, + GetCVMFSRepositoriesJSON, + GetCVMFSStatusJSON, + GetGeoAPI, ] diff --git a/cvmfsscraper/repository.py b/cvmfsscraper/repository.py index 1581465..2f33b84 100644 --- a/cvmfsscraper/repository.py +++ b/cvmfsscraper/repository.py @@ -1,16 +1,16 @@ """A CVMFS repository.""" -from typing import Dict +from typing import TYPE_CHECKING, Dict, cast import structlog -from cvmfsscraper.http_get_models import ( - Endpoints, - GetCVMFSPublished, - GetCVMFSStatusJSON, -) +from cvmfsscraper.exceptions import CVMFSFetchError +from cvmfsscraper.http_get_models import Endpoints, GetCVMFSPublished, GetCVMFSStatusJSON log = structlog.getLogger(__name__) +if TYPE_CHECKING: # pragma: no cover + from cvmfsscraper.server import CVMFSServer + class Repository: """A CVMFS repository. @@ -37,7 +37,7 @@ class Repository: "L": "micro_catalogues", } - def __init__(self, server: object, name: str, url: str): + def __init__(self, server: "CVMFSServer", name: str, url: str): """Initialize the repository. :param server: The server object this repository belongs to. @@ -51,6 +51,10 @@ def __init__(self, server: object, name: str, url: str): self.last_gc = None self.last_snapshot = None + self.root_size = 0 + self.revision = 0 + self.revision_timestamp = 0 + self._repo_status_loaded = 0 self._cvmfspublished_loaded = 0 @@ -152,7 +156,11 @@ def fetch_cvmfspublished(self) -> GetCVMFSPublished: :returns: A GetCVMFSPublished object. """ - return self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name) + cvmfspublished = self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name) + if not cvmfspublished: + raise CVMFSFetchError("Failed to fetch .cvmfspublished") + + return cast(GetCVMFSPublished, cvmfspublished) def fetch_repository(self) -> GetCVMFSStatusJSON: """Fetch a repository by name. @@ -162,4 +170,8 @@ def fetch_repository(self) -> GetCVMFSStatusJSON: :returns: GetCVMFSStatusJSON object. """ - return self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name) + cvmfsstatus = self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name) + if not cvmfsstatus: + raise CVMFSFetchError("Failed to fetch .cvmfs_status.json") + + return cast(GetCVMFSStatusJSON, cvmfsstatus) diff --git a/cvmfsscraper/server.py b/cvmfsscraper/server.py index eee4231..c9f34a5 100644 --- a/cvmfsscraper/server.py +++ b/cvmfsscraper/server.py @@ -1,14 +1,13 @@ """Server class for cvmfs-server-metadata.""" import json -from typing import Dict, List +from typing import TYPE_CHECKING, Dict, List, Union, cast from urllib import error, request import structlog from cvmfsscraper.constants import GeoAPIStatus from cvmfsscraper.http_get_models import ( - EndpointClassesType, Endpoints, GetCVMFSPublished, GetCVMFSRepositoriesJSON, @@ -20,6 +19,9 @@ log = structlog.getLogger(__name__) +if TYPE_CHECKING: # pragma: no cover + from cvmfsscraper.http_get_models import BaseModel + class CVMFSServer: """Base class for CVMFS servers.""" @@ -125,11 +127,12 @@ def populate_repositories(self) -> None: repodata = self.fetch_repositories_json() if repodata: + # This should populate self.repositories. self.process_repositories_json(repodata) if self.fetch_errors: # pragma: no cover self._is_down = True - return [] + return None self._is_down = False except Exception as e: # pragma: no cover @@ -140,7 +143,12 @@ def populate_repositories(self) -> None: ) self.fetch_errors.append({"path": self.name, "error": e}) - def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[Repository]: + if self.is_stratum1(): + for repo in self.forced_repositories: + if repo not in [r.name for r in self.repositories]: + self.repositories.append(Repository(self, repo, "/cvmfs/" + repo)) + + def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> None: """Process the repositories.json file. Sets self.repos and self.metadata. @@ -153,7 +161,7 @@ def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[ if repodata.replicas: self.server_type = 1 repos_on_server = repodata.replicas - else: + elif repodata.repositories: self.server_type = 0 repos_on_server = repodata.repositories @@ -197,6 +205,9 @@ def check_geoapi_status(self) -> GeoAPIStatus: try: geoapi_obj = self.fetch_geoapi(self.repositories[0]) + if not geoapi_obj: + return GeoAPIStatus.NO_RESPONSE + if geoapi_obj.has_order(self.geoapi_order): return GeoAPIStatus.OK else: @@ -209,32 +220,46 @@ def check_geoapi_status(self) -> GeoAPIStatus: ) return GeoAPIStatus.NO_RESPONSE - def fetch_repositories_json(self) -> GetCVMFSRepositoriesJSON: + def fetch_repositories_json(self) -> Union[GetCVMFSRepositoriesJSON, None]: """Fetch the repositories JSON file. + Note: This function will return None if the server is a stratum1 and uses S3 as + its backend. In this case, the endpoint is not available. + raises: urlllib.error.URLError (or a subclass thereof) for URL errors. pydantic.ValidationError if the object creation fails. - returns: A GetCVMFSRepositoriesJSON object. + returns: A GetCVMFSRepositoriesJSON object or None """ - return self.fetch_endpoint(Endpoints.REPOSITORIES_JSON) + repos = self.fetch_endpoint(Endpoints.REPOSITORIES_JSON) + if not repos: + return None + + return cast(GetCVMFSRepositoriesJSON, repos) - def fetch_geoapi(self, repo: Repository) -> GetGeoAPI: + def fetch_geoapi(self, repo: Repository) -> Union[GetGeoAPI, None]: """Fetch the GeoAPI host ordering. + Note: This function will return None if the server is a stratum1 and uses S3 as + its backend. In this case, the endpoint is not available. + raises: urlllib.error.URLError (or a subclass thereof) for URL errors. pydantic.ValidationError if the object creation fails. - :returns: A GetGeoAPI object. + :returns: A GetGeoAPI object or None """ - return self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name) + geoapi = self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name) + if not geoapi: + return None + + return cast(GetGeoAPI, geoapi) def fetch_endpoint( self, endpoint: Endpoints, repo: str = "data", - geoapi_servers: str = GEOAPI_SERVERS, - ) -> EndpointClassesType: + geoapi_servers: List[str] = GEOAPI_SERVERS, + ) -> Union["BaseModel", None]: """Fetch and process a specified URL endpoint. This function reads the content of a specified URL and ether returns a validated @@ -270,11 +295,12 @@ def fetch_endpoint( geoapi_str = ",".join(geoapi_servers) formatted_path = endpoint.path.format(repo=repo, geoapi_str=geoapi_str) url = f"{self.url()}/cvmfs/{formatted_path}" - timeout_seconds = 5 try: log.info("Fetching url", url=url) - content = request.urlopen(url, timeout=timeout_seconds) + req = request.Request(url) + req.add_header("User-Agent", "Mozilla/5.0") + content = request.urlopen(req, timeout=timeout_seconds) if endpoint in [Endpoints.REPOSITORIES_JSON, Endpoints.CVMFS_STATUS_JSON]: log.debug( @@ -307,6 +333,34 @@ def fetch_endpoint( return endpoint.model_class(**content) + except error.HTTPError as e: + # If we get a 403 from a stratum1 on the repositories.json endpoint, we are + # probably dealing with a server that uses S3 as its backend. In this case + # this endpoint is not available, and we should just ignore it. + if ( + e + and (endpoint == Endpoints.REPOSITORIES_JSON or endpoint == Endpoints.GEOAPI) + and self.server_type == 1 + and e.code == 404 + ): + log.debug( + "Assuming S3 backend for stratum1", + server=self.name, + endpoint=endpoint.name, + repo=repo, + url=url, + ) + return None + log.error( + "Fetch endpoint failure", + exc=e, + name=self.name, + endpoint=endpoint.name, + repo=repo, + url=url, + ) + raise e from e + except error.URLError as e: log.error( "Fetch endpoint failure", diff --git a/cvmfsscraper/tests/base.py b/cvmfsscraper/tests/base.py index fc46e53..7faa872 100644 --- a/cvmfsscraper/tests/base.py +++ b/cvmfsscraper/tests/base.py @@ -45,7 +45,9 @@ def validate_and_load(data_dir: str) -> Dict[str, Union[str, bytes]]: ENDPOINTS["http://example.com/timeout"] = urllib.error.URLError("timeout") -def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Mock, Exception]: +def mock_urlopen( + url: Union[str, urllib.request.Request], timeout: Union[int, float, None] = None +) -> Union[Mock, Exception]: """Mock urllib.request.urlopen based on a predefined URL mapping. :param url: The URL to fetch. @@ -56,6 +58,8 @@ def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Moc :returns: Mocked HTTPResponse object with read() method. """ + url = url.full_url if isinstance(url, urllib.request.Request) else url + if url not in ENDPOINTS: raise HTTPError(url, 404, "Not Found", {}, None) diff --git a/cvmfsscraper/tests/test_002_models.py b/cvmfsscraper/tests/test_002_models.py index ca65177..3ad87cf 100644 --- a/cvmfsscraper/tests/test_002_models.py +++ b/cvmfsscraper/tests/test_002_models.py @@ -1,5 +1,7 @@ """Test the pydantic models in cvmfsscraper/models.py.""" +from __future__ import annotations + import json import os from copy import deepcopy @@ -76,7 +78,7 @@ class BaseCVMFSModelTestCase(TestCase): """Base model for testing CVMFS models.""" def verify_date_field( - self, cls: CVMFSBaseModel, input_data: Dict[str, Any], field: str + self, cls: type[CVMFSBaseModel], input_data: Dict[str, Any], field: str ) -> None: """Verify that a given field in the dataset is validated as a CVMFS date.""" data = deepcopy(input_data) @@ -102,11 +104,11 @@ def verify_date_field( def verify_str_field( self, - cls: CVMFSBaseModel, + cls: type[CVMFSBaseModel], input_data: Dict[str, Any], field: str, - min_length: int = None, - max_length: int = None, + min_length: int = 0, + max_length: int = 0, is_hex: bool = False, ) -> None: """Verify that a given field in the dataset is validated as a string.""" @@ -142,7 +144,7 @@ def verify_str_field( def verify_int_field( self, - cls: CVMFSBaseModel, + cls: type[CVMFSBaseModel], input_data: Dict[str, Any], field: str, require_positive: bool = False, diff --git a/test.py b/test.py old mode 100644 new mode 100755 index 8790a93..cf3711d --- a/test.py +++ b/test.py @@ -1,21 +1,24 @@ #!/usr/bin/env python3 """Proof of concept EESSI test script.""" -from cvmfsscraper import scrape +import logging + +from cvmfsscraper import scrape, set_log_level + +set_log_level(logging.WARNING) # server = scrape_server("aws-eu-west1.stratum1.cvmfs.eessi-infra.org") servers = scrape( - stratum0_servers=[ - "rug-nl.stratum0.cvmfs.eessi-infra.org", - ], + stratum0_servers=["rug-nl-s0.eessi.science"], stratum1_servers=[ - "aws-eu-west1.stratum1.cvmfs.eessi-infra.org", - "azure-us-east1.stratum1.cvmfs.eessi-infra.org", - "bgo-no.stratum1.cvmfs.eessi-infra.org", - "rug-nl.stratum1.cvmfs.eessi-infra.org", + "aws-eu-central-s1.eessi.science", + # "azure-us-east-s1.eessi.science", + "aws-eu-west-s1-sync.eessi.science", ], - repos=[], + # We need to force repos due to the sync server using S3 as its backend. + # S3 does not support reporting the list of repositories. + repos=["software.eessi.io", "riscv.eessi.io", "dev.eessi.io"], ignore_repos=[ "bla.eessi-hpc.org", "bob.eessi-hpc.org", @@ -32,7 +35,7 @@ print(" - " + key + ": " + value) print(" Repositories: ") for repo in server.repositories: - print(" - " + repo.name) + print(" - " + repo.name + " (" + repo.path + ")") print(f" : Root size: {repo.root_size}") print(f" : Revision: {repo.revision}") print(f" : Revision timestamp: {repo.revision_timestamp}")