Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit of S3 support for Stratum1. #14

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ for repo in servers[0].repositories:
print("Last snapshot: " + str(repo.last_snapshot))
````

Note that if you are using a Stratum1 server with S3 as its backend, you need to set repos explicitly.
This is because the S3 backend does not have a `cvmfs/info/v1/repositories.json` file. Also, the GeoAPI
status will be `NOT_FOUND` for these servers.

````python

# Data structure

## Server
Expand Down
7 changes: 7 additions & 0 deletions cvmfsscraper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.CallsiteParameterAdder(
[
structlog.processors.CallsiteParameter.FILENAME,
structlog.processors.CallsiteParameter.FUNC_NAME,
structlog.processors.CallsiteParameter.LINENO,
],
),
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S", utc=False),
structlog.processors.JSONRenderer(),
],
Expand Down
6 changes: 4 additions & 2 deletions cvmfsscraper/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Exceptions for cvmfsscraper."""

from typing import Any
from typing import Any, Union

import structlog

Expand All @@ -10,7 +10,9 @@
class CVMFSScraperBaseException(Exception):
"""Base exception for cvmfsscraper."""

def __init__(self, message: str, original_excption: Exception = None, *args: Any) -> None:
def __init__(
self, message: str, original_excption: Union[Exception, None] = None, *args: Any
) -> None:
"""Initialize the exception."""
self.message = message
self.original_exception = original_excption
Expand Down
5 changes: 4 additions & 1 deletion cvmfsscraper/http_get_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,5 +309,8 @@ def __init__(self, path: str, model_class: Type[BaseModel]) -> None:
# Dynamically creating this list based on the Endpoints enum values
# is not supported by mypy et al, so we have to do it manually.
EndpointClassesType = Union[
GetCVMFSPublished, GetCVMFSRepositoriesJSON, GetCVMFSStatusJSON, GetGeoAPI
GetCVMFSPublished,
GetCVMFSRepositoriesJSON,
GetCVMFSStatusJSON,
GetGeoAPI,
]
30 changes: 21 additions & 9 deletions cvmfsscraper/repository.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
"""A CVMFS repository."""
from typing import Dict
from typing import TYPE_CHECKING, Dict, cast

import structlog

from cvmfsscraper.http_get_models import (
Endpoints,
GetCVMFSPublished,
GetCVMFSStatusJSON,
)
from cvmfsscraper.exceptions import CVMFSFetchError
from cvmfsscraper.http_get_models import Endpoints, GetCVMFSPublished, GetCVMFSStatusJSON

log = structlog.getLogger(__name__)

if TYPE_CHECKING: # pragma: no cover
from cvmfsscraper.server import CVMFSServer


class Repository:
"""A CVMFS repository.
Expand All @@ -37,7 +37,7 @@ class Repository:
"L": "micro_catalogues",
}

def __init__(self, server: object, name: str, url: str):
def __init__(self, server: "CVMFSServer", name: str, url: str):
"""Initialize the repository.

:param server: The server object this repository belongs to.
Expand All @@ -51,6 +51,10 @@ def __init__(self, server: object, name: str, url: str):
self.last_gc = None
self.last_snapshot = None

self.root_size = 0
self.revision = 0
self.revision_timestamp = 0

self._repo_status_loaded = 0
self._cvmfspublished_loaded = 0

Expand Down Expand Up @@ -152,7 +156,11 @@ def fetch_cvmfspublished(self) -> GetCVMFSPublished:

:returns: A GetCVMFSPublished object.
"""
return self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name)
cvmfspublished = self.server.fetch_endpoint(Endpoints.CVMFS_PUBLISHED, self.name)
if not cvmfspublished:
raise CVMFSFetchError("Failed to fetch .cvmfspublished")

return cast(GetCVMFSPublished, cvmfspublished)

def fetch_repository(self) -> GetCVMFSStatusJSON:
"""Fetch a repository by name.
Expand All @@ -162,4 +170,8 @@ def fetch_repository(self) -> GetCVMFSStatusJSON:

:returns: GetCVMFSStatusJSON object.
"""
return self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name)
cvmfsstatus = self.server.fetch_endpoint(Endpoints.CVMFS_STATUS_JSON, self.name)
if not cvmfsstatus:
raise CVMFSFetchError("Failed to fetch .cvmfs_status.json")

return cast(GetCVMFSStatusJSON, cvmfsstatus)
84 changes: 69 additions & 15 deletions cvmfsscraper/server.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
"""Server class for cvmfs-server-metadata."""

import json
from typing import Dict, List
from typing import TYPE_CHECKING, Dict, List, Union, cast
from urllib import error, request

import structlog

from cvmfsscraper.constants import GeoAPIStatus
from cvmfsscraper.http_get_models import (
EndpointClassesType,
Endpoints,
GetCVMFSPublished,
GetCVMFSRepositoriesJSON,
Expand All @@ -20,6 +19,9 @@

log = structlog.getLogger(__name__)

if TYPE_CHECKING: # pragma: no cover
from cvmfsscraper.http_get_models import BaseModel


class CVMFSServer:
"""Base class for CVMFS servers."""
Expand Down Expand Up @@ -125,11 +127,12 @@ def populate_repositories(self) -> None:
repodata = self.fetch_repositories_json()

if repodata:
# This should populate self.repositories.
self.process_repositories_json(repodata)

if self.fetch_errors: # pragma: no cover
self._is_down = True
return []
return None

self._is_down = False
except Exception as e: # pragma: no cover
Expand All @@ -140,7 +143,12 @@ def populate_repositories(self) -> None:
)
self.fetch_errors.append({"path": self.name, "error": e})

def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[Repository]:
if self.is_stratum1():
for repo in self.forced_repositories:
if repo not in [r.name for r in self.repositories]:
self.repositories.append(Repository(self, repo, "/cvmfs/" + repo))

def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> None:
"""Process the repositories.json file.

Sets self.repos and self.metadata.
Expand All @@ -153,7 +161,7 @@ def process_repositories_json(self, repodata: GetCVMFSRepositoriesJSON) -> List[
if repodata.replicas:
self.server_type = 1
repos_on_server = repodata.replicas
else:
elif repodata.repositories:
self.server_type = 0
repos_on_server = repodata.repositories

Expand Down Expand Up @@ -197,6 +205,9 @@ def check_geoapi_status(self) -> GeoAPIStatus:

try:
geoapi_obj = self.fetch_geoapi(self.repositories[0])
if not geoapi_obj:
return GeoAPIStatus.NO_RESPONSE

if geoapi_obj.has_order(self.geoapi_order):
return GeoAPIStatus.OK
else:
Expand All @@ -209,32 +220,46 @@ def check_geoapi_status(self) -> GeoAPIStatus:
)
return GeoAPIStatus.NO_RESPONSE

def fetch_repositories_json(self) -> GetCVMFSRepositoriesJSON:
def fetch_repositories_json(self) -> Union[GetCVMFSRepositoriesJSON, None]:
"""Fetch the repositories JSON file.

Note: This function will return None if the server is a stratum1 and uses S3 as
its backend. In this case, the endpoint is not available.

raises: urlllib.error.URLError (or a subclass thereof) for URL errors.
pydantic.ValidationError if the object creation fails.

returns: A GetCVMFSRepositoriesJSON object.
returns: A GetCVMFSRepositoriesJSON object or None
"""
return self.fetch_endpoint(Endpoints.REPOSITORIES_JSON)
repos = self.fetch_endpoint(Endpoints.REPOSITORIES_JSON)
if not repos:
return None

return cast(GetCVMFSRepositoriesJSON, repos)

def fetch_geoapi(self, repo: Repository) -> GetGeoAPI:
def fetch_geoapi(self, repo: Repository) -> Union[GetGeoAPI, None]:
"""Fetch the GeoAPI host ordering.

Note: This function will return None if the server is a stratum1 and uses S3 as
its backend. In this case, the endpoint is not available.

raises: urlllib.error.URLError (or a subclass thereof) for URL errors.
pydantic.ValidationError if the object creation fails.

:returns: A GetGeoAPI object.
:returns: A GetGeoAPI object or None
"""
return self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name)
geoapi = self.fetch_endpoint(Endpoints.GEOAPI, repo=repo.name)
if not geoapi:
return None

return cast(GetGeoAPI, geoapi)

def fetch_endpoint(
self,
endpoint: Endpoints,
repo: str = "data",
geoapi_servers: str = GEOAPI_SERVERS,
) -> EndpointClassesType:
geoapi_servers: List[str] = GEOAPI_SERVERS,
) -> Union["BaseModel", None]:
"""Fetch and process a specified URL endpoint.

This function reads the content of a specified URL and ether returns a validated
Expand Down Expand Up @@ -270,11 +295,12 @@ def fetch_endpoint(
geoapi_str = ",".join(geoapi_servers)
formatted_path = endpoint.path.format(repo=repo, geoapi_str=geoapi_str)
url = f"{self.url()}/cvmfs/{formatted_path}"

timeout_seconds = 5
try:
log.info("Fetching url", url=url)
content = request.urlopen(url, timeout=timeout_seconds)
req = request.Request(url)
req.add_header("User-Agent", "Mozilla/5.0")
content = request.urlopen(req, timeout=timeout_seconds)

if endpoint in [Endpoints.REPOSITORIES_JSON, Endpoints.CVMFS_STATUS_JSON]:
log.debug(
Expand Down Expand Up @@ -307,6 +333,34 @@ def fetch_endpoint(

return endpoint.model_class(**content)

except error.HTTPError as e:
# If we get a 403 from a stratum1 on the repositories.json endpoint, we are
# probably dealing with a server that uses S3 as its backend. In this case
# this endpoint is not available, and we should just ignore it.
if (
e
and (endpoint == Endpoints.REPOSITORIES_JSON or endpoint == Endpoints.GEOAPI)
and self.server_type == 1
and e.code == 404
):
log.debug(
"Assuming S3 backend for stratum1",
server=self.name,
endpoint=endpoint.name,
repo=repo,
url=url,
)
return None
log.error(
"Fetch endpoint failure",
exc=e,
name=self.name,
endpoint=endpoint.name,
repo=repo,
url=url,
)
raise e from e

except error.URLError as e:
log.error(
"Fetch endpoint failure",
Expand Down
6 changes: 5 additions & 1 deletion cvmfsscraper/tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ def validate_and_load(data_dir: str) -> Dict[str, Union[str, bytes]]:
ENDPOINTS["http://example.com/timeout"] = urllib.error.URLError("timeout")


def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Mock, Exception]:
def mock_urlopen(
url: Union[str, urllib.request.Request], timeout: Union[int, float, None] = None
) -> Union[Mock, Exception]:
"""Mock urllib.request.urlopen based on a predefined URL mapping.

:param url: The URL to fetch.
Expand All @@ -56,6 +58,8 @@ def mock_urlopen(url: str, timeout: Union[int, float, None] = None) -> Union[Moc

:returns: Mocked HTTPResponse object with read() method.
"""
url = url.full_url if isinstance(url, urllib.request.Request) else url

if url not in ENDPOINTS:
raise HTTPError(url, 404, "Not Found", {}, None)

Expand Down
12 changes: 7 additions & 5 deletions cvmfsscraper/tests/test_002_models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Test the pydantic models in cvmfsscraper/models.py."""

from __future__ import annotations

import json
import os
from copy import deepcopy
Expand Down Expand Up @@ -76,7 +78,7 @@ class BaseCVMFSModelTestCase(TestCase):
"""Base model for testing CVMFS models."""

def verify_date_field(
self, cls: CVMFSBaseModel, input_data: Dict[str, Any], field: str
self, cls: type[CVMFSBaseModel], input_data: Dict[str, Any], field: str
) -> None:
"""Verify that a given field in the dataset is validated as a CVMFS date."""
data = deepcopy(input_data)
Expand All @@ -102,11 +104,11 @@ def verify_date_field(

def verify_str_field(
self,
cls: CVMFSBaseModel,
cls: type[CVMFSBaseModel],
input_data: Dict[str, Any],
field: str,
min_length: int = None,
max_length: int = None,
min_length: int = 0,
max_length: int = 0,
is_hex: bool = False,
) -> None:
"""Verify that a given field in the dataset is validated as a string."""
Expand Down Expand Up @@ -142,7 +144,7 @@ def verify_str_field(

def verify_int_field(
self,
cls: CVMFSBaseModel,
cls: type[CVMFSBaseModel],
input_data: Dict[str, Any],
field: str,
require_positive: bool = False,
Expand Down
Loading
Loading