diff --git a/src/mobu/autostart.py b/src/mobu/autostart.py deleted file mode 100644 index 704a6615..00000000 --- a/src/mobu/autostart.py +++ /dev/null @@ -1,26 +0,0 @@ -"""Support for automatically starting flocks.""" - -from __future__ import annotations - -import yaml - -from .config import config -from .dependencies.manager import monkey_business_manager -from .models.flock import FlockConfig - - -async def autostart() -> None: - """Automatically start configured flocks. - - This function should be called from the startup hook of the FastAPI - application. - """ - if not config.autostart: - return - - with open(config.autostart, "r") as f: - autostart = yaml.safe_load(f) - flock_configs = [FlockConfig.parse_obj(flock) for flock in autostart] - - for flock_config in flock_configs: - await monkey_business_manager.start_flock(flock_config) diff --git a/src/mobu/dependencies/context.py b/src/mobu/dependencies/context.py new file mode 100644 index 00000000..90d9b376 --- /dev/null +++ b/src/mobu/dependencies/context.py @@ -0,0 +1,86 @@ +"""Request context dependency for FastAPI. + +This dependency gathers a variety of information into a single object for the +convenience of writing request handlers. It also provides a place to store a +`structlog.BoundLogger` that can gather additional context during processing, +including from dependencies. +""" + +from dataclasses import dataclass +from typing import Optional + +from fastapi import Depends, Request +from safir.dependencies.gafaelfawr import auth_logger_dependency +from structlog.stdlib import BoundLogger + +from ..factory import ProcessContext +from ..services.manager import FlockManager + +__all__ = [ + "ContextDependency", + "RequestContext", + "context_dependency", +] + + +@dataclass(slots=True) +class RequestContext: + """Holds the incoming request and its surrounding context.""" + + request: Request + """Incoming request.""" + + logger: BoundLogger + """Request loger, rebound with discovered context.""" + + manager: FlockManager + """Global singleton flock manager.""" + + +class ContextDependency: + """Provide a per-request context as a FastAPI dependency. + + Each request gets a `RequestContext`. To save overhead, the portions of + the context that are shared by all requests are collected into the single + process-global `~mobu.factory.ProcessContext` and reused with each + request. + """ + + def __init__(self) -> None: + self._process_context: Optional[ProcessContext] = None + + async def __call__( + self, + request: Request, + logger: BoundLogger = Depends(auth_logger_dependency), + ) -> RequestContext: + """Create a per-request context.""" + if not self._process_context: + raise RuntimeError("ContextDependency not initialized") + return RequestContext( + request=request, + logger=logger, + manager=self._process_context.manager, + ) + + @property + def process_context(self) -> ProcessContext: + if not self._process_context: + raise RuntimeError("ContextDependency not initialized") + return self._process_context + + async def initialize(self) -> None: + """Initialize the process-wide shared context.""" + if self._process_context: + await self._process_context.aclose() + self._process_context = ProcessContext() + + async def aclose(self) -> None: + """Clean up the per-process configuration.""" + if self._process_context: + await self._process_context.aclose() + self._process_context = None + + +context_dependency = ContextDependency() +"""The dependency that will return the per-request context.""" diff --git a/src/mobu/dependencies/manager.py b/src/mobu/dependencies/manager.py deleted file mode 100644 index 31a17adb..00000000 --- a/src/mobu/dependencies/manager.py +++ /dev/null @@ -1,81 +0,0 @@ -"""Provide a global MonkeyBusinessManager used to manage monkeys.""" - -from __future__ import annotations - -import asyncio -from typing import Optional - -import structlog -from aiohttp import ClientSession -from aiojobs import Scheduler - -from ..exceptions import FlockNotFoundException -from ..models.flock import FlockConfig, FlockSummary -from ..services.flock import Flock - -__all__ = ["MonkeyBusinessManager", "monkey_business_manager"] - - -class MonkeyBusinessManager: - """Manages all of the running monkeys.""" - - def __init__(self) -> None: - self._flocks: dict[str, Flock] = {} - self._scheduler: Optional[Scheduler] = None - self._session: Optional[ClientSession] = None - self._logger = structlog.get_logger("mobu") - - async def __call__(self) -> MonkeyBusinessManager: - return self - - async def init(self) -> None: - self._scheduler = Scheduler(limit=1000, pending_limit=0) - self._session = ClientSession() - - async def cleanup(self) -> None: - awaits = [self.stop_flock(f) for f in self._flocks] - await asyncio.gather(*awaits) - if self._scheduler is not None: - await self._scheduler.close() - self._scheduler = None - if self._session: - await self._session.close() - self._session = None - - async def start_flock(self, flock_config: FlockConfig) -> Flock: - if self._scheduler is None or not self._session: - raise RuntimeError("MonkeyBusinessManager not initialized") - flock = Flock( - flock_config=flock_config, - scheduler=self._scheduler, - session=self._session, - logger=self._logger, - ) - if flock.name in self._flocks: - await self._flocks[flock.name].stop() - self._flocks[flock.name] = flock - await flock.start() - return flock - - def get_flock(self, name: str) -> Flock: - flock = self._flocks.get(name) - if flock is None: - raise FlockNotFoundException(name) - return flock - - def list_flocks(self) -> list[str]: - return sorted(self._flocks.keys()) - - def summarize_flocks(self) -> list[FlockSummary]: - return [f.summary() for _, f in sorted(self._flocks.items())] - - async def stop_flock(self, name: str) -> None: - flock = self._flocks.get(name) - if flock is None: - raise FlockNotFoundException(name) - del self._flocks[name] - await flock.stop() - - -monkey_business_manager = MonkeyBusinessManager() -"""Global manager for all running monkeys.""" diff --git a/src/mobu/factory.py b/src/mobu/factory.py new file mode 100644 index 00000000..43aa5340 --- /dev/null +++ b/src/mobu/factory.py @@ -0,0 +1,74 @@ +"""Component factory and process-wide status for mobu.""" + +from __future__ import annotations + +from typing import Optional + +import structlog +from aiohttp import ClientSession +from safir.slack.webhook import SlackWebhookClient +from structlog import BoundLogger + +from .config import config +from .services.manager import FlockManager + +__all__ = ["Factory", "ProcessContext"] + + +class ProcessContext: + """Per-process application context. + + This object caches all of the per-process singletons that can be reused + for every request. + + Attributes + ---------- + manager + Manager for all running flocks. + session + Shared HTTP client session. + """ + + def __init__(self) -> None: + self.session = ClientSession() + self.manager = FlockManager(self.session, structlog.get_logger("mobu")) + + async def aclose(self) -> None: + """Clean up a process context. + + Called before shutdown to free resources. + """ + await self.manager.aclose() + await self.session.close() + + +class Factory: + """Component factory for mobu. + + Uses the contents of a `ProcessContext` to construct the components of an + application on demand. + + Parameters + ---------- + context + Shared process context. + """ + + def __init__( + self, context: ProcessContext, logger: Optional[BoundLogger] = None + ) -> None: + self._context = context + self._logger = logger if logger else structlog.get_logger("mobu") + + def create_slack_webhook_client(self) -> SlackWebhookClient | None: + """Create a Slack webhook client if configured for Slack alerting. + + Returns + ------- + SlackWebhookClient or None + Newly-created Slack client, or `None` if Slack alerting is not + configured. + """ + if not config.alert_hook or config.alert_hook == "None": + return None + return SlackWebhookClient(config.alert_hook, "Mobu", self._logger) diff --git a/src/mobu/handlers/external.py b/src/mobu/handlers/external.py index a025b51b..02908b0e 100644 --- a/src/mobu/handlers/external.py +++ b/src/mobu/handlers/external.py @@ -2,21 +2,15 @@ import json from typing import Any -from urllib.parse import quote from fastapi import APIRouter, Depends, Response from fastapi.responses import FileResponse, JSONResponse from safir.datetime import current_datetime -from safir.dependencies.gafaelfawr import auth_logger_dependency from safir.metadata import get_metadata from safir.models import ErrorModel -from structlog.stdlib import BoundLogger from ..config import config -from ..dependencies.manager import ( - MonkeyBusinessManager, - monkey_business_manager, -) +from ..dependencies.context import RequestContext, context_dependency from ..models.flock import FlockConfig, FlockData, FlockSummary from ..models.index import Index from ..models.monkey import MonkeyData @@ -58,9 +52,9 @@ async def get_index() -> Index: "/flocks", response_model=list[str], summary="List running flocks" ) async def get_flocks( - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> list[str]: - return manager.list_flocks() + return context.manager.list_flocks() @external_router.put( @@ -75,14 +69,16 @@ async def get_flocks( async def put_flock( flock_config: FlockConfig, response: Response, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), - logger: BoundLogger = Depends(auth_logger_dependency), + context: RequestContext = Depends(context_dependency), ) -> FlockData: - logger.info( - "Creating flock", flock=flock_config.name, config=flock_config.dict() + context.logger.info( + "Creating flock", + flock=flock_config.name, + config=flock_config.dict(exclude_unset=True), ) - flock = await manager.start_flock(flock_config) - response.headers["Location"] = quote(f"/mobu/flocks/{flock.name}") + flock = await context.manager.start_flock(flock_config) + flock_url = context.request.url_for("get_flock", flock=flock.name) + response.headers["Location"] = str(flock_url) return flock.dump() @@ -97,9 +93,9 @@ async def put_flock( ) async def get_flock( flock: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> FlockData: - return manager.get_flock(flock).dump() + return context.manager.get_flock(flock).dump() @external_router.delete( @@ -110,11 +106,10 @@ async def get_flock( ) async def delete_flock( flock: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), - logger: BoundLogger = Depends(auth_logger_dependency), + context: RequestContext = Depends(context_dependency), ) -> None: - logger.info("Deleting flock", flock=flock) - await manager.stop_flock(flock) + context.logger.info("Deleting flock", flock=flock) + await context.manager.stop_flock(flock) @external_router.get( @@ -126,9 +121,9 @@ async def delete_flock( ) async def get_monkeys( flock: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> list[str]: - return manager.get_flock(flock).list_monkeys() + return context.manager.get_flock(flock).list_monkeys() @external_router.get( @@ -145,9 +140,9 @@ async def get_monkeys( async def get_monkey( flock: str, monkey: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> MonkeyData: - return manager.get_flock(flock).get_monkey(monkey).dump() + return context.manager.get_flock(flock).get_monkey(monkey).dump() @external_router.get( @@ -162,10 +157,10 @@ async def get_monkey( async def get_monkey_log( flock: str, monkey: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> FileResponse: return FileResponse( - path=manager.get_flock(flock).get_monkey(monkey).logfile(), + path=context.manager.get_flock(flock).get_monkey(monkey).logfile(), media_type="text/plain", filename=f"{flock}-{monkey}-{current_datetime()}", ) @@ -180,9 +175,9 @@ async def get_monkey_log( ) async def get_flock_summary( flock: str, - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> FlockSummary: - return manager.get_flock(flock).summary() + return context.manager.get_flock(flock).summary() @external_router.get( @@ -192,6 +187,6 @@ async def get_flock_summary( summary="Summary of statistics for all flocks", ) async def get_summary( - manager: MonkeyBusinessManager = Depends(monkey_business_manager), + context: RequestContext = Depends(context_dependency), ) -> list[FlockSummary]: - return manager.summarize_flocks() + return context.manager.summarize_flocks() diff --git a/src/mobu/main.py b/src/mobu/main.py index 2f84e21a..21a8316f 100644 --- a/src/mobu/main.py +++ b/src/mobu/main.py @@ -17,9 +17,8 @@ from safir.models import ErrorLocation from .asyncio import schedule_periodic -from .autostart import autostart from .config import config -from .dependencies.manager import monkey_business_manager +from .dependencies.context import context_dependency from .exceptions import FlockNotFoundException, MonkeyNotFoundException from .handlers.external import external_router from .handlers.internal import internal_router @@ -58,15 +57,14 @@ async def startup_event() -> None: raise RuntimeError("ENVIRONMENT_URL was not set") if not config.gafaelfawr_token: raise RuntimeError("GAFAELFAWR_TOKEN was not set") - await monkey_business_manager.init() - if config.autostart: - await autostart() + await context_dependency.initialize() + await context_dependency.process_context.manager.autostart() app.state.periodic_status = schedule_periodic(post_status, 60 * 60 * 24) @app.on_event("shutdown") async def shutdown_event() -> None: - await monkey_business_manager.cleanup() + await context_dependency.aclose() app.state.periodic_status.cancel() try: await app.state.periodic_status diff --git a/src/mobu/services/manager.py b/src/mobu/services/manager.py new file mode 100644 index 00000000..0cdb6cdd --- /dev/null +++ b/src/mobu/services/manager.py @@ -0,0 +1,141 @@ +"""Manager for all the running flocks.""" + +from __future__ import annotations + +import asyncio + +import yaml +from aiohttp import ClientSession +from aiojobs import Scheduler +from structlog.stdlib import BoundLogger + +from ..config import config +from ..exceptions import FlockNotFoundException +from ..models.flock import FlockConfig, FlockSummary +from .flock import Flock + +__all__ = ["FlockManager"] + + +class FlockManager: + """Manages all of the running flocks. + + This should be a process singleton. It is responsible for managing all of + the flocks running in the background, including shutting them down and + starting new ones. + + Parameters + ---------- + session + HTTP client session to use. + logger + Global logger to use for process-wide (not monkey) logging. + """ + + def __init__(self, session: ClientSession, logger: BoundLogger) -> None: + self._session = session + self._logger = logger + self._flocks: dict[str, Flock] = {} + self._scheduler = Scheduler(limit=1000, pending_limit=0) + + async def aclose(self) -> None: + """Stop all flocks and free all resources.""" + awaits = [self.stop_flock(f) for f in self._flocks] + await asyncio.gather(*awaits) + await self._scheduler.close() + + async def autostart(self) -> None: + """Automatically start configured flocks. + + This function should be called from the startup hook of the FastAPI + application. + """ + if not config.autostart: + return + with open(config.autostart, "r") as f: + autostart = yaml.safe_load(f) + flock_configs = [FlockConfig.parse_obj(flock) for flock in autostart] + for flock_config in flock_configs: + await self.start_flock(flock_config) + + async def start_flock(self, flock_config: FlockConfig) -> Flock: + """Create and start a new flock of monkeys. + + Parameters + ---------- + flock_config + Configuration for that flock. + + Returns + ------- + Flock + Newly-created flock. + """ + flock = Flock( + flock_config=flock_config, + scheduler=self._scheduler, + session=self._session, + logger=self._logger, + ) + if flock.name in self._flocks: + await self._flocks[flock.name].stop() + self._flocks[flock.name] = flock + await flock.start() + return flock + + def get_flock(self, name: str) -> Flock: + """Retrieve a flock by name. + + Parameters + ---------- + name + Name of the flock. + + Returns + ------- + Flock + Flock with that name. + + Raises + ------ + FlockNotFoundException + Raised if no flock was found with that name. + """ + flock = self._flocks.get(name) + if flock is None: + raise FlockNotFoundException(name) + return flock + + def list_flocks(self) -> list[str]: + """List all flocks. + + Returns + ------- + list of str + Names of all flocks in sorted order. + """ + return sorted(self._flocks.keys()) + + def summarize_flocks(self) -> list[FlockSummary]: + """Summarize the status of all flocks. + + Returns + ------- + list of FlockSumary + Flock summary data sorted by flock name. + """ + return [f.summary() for _, f in sorted(self._flocks.items())] + + async def stop_flock(self, name: str) -> None: + """Stop a flock. + + Parameters + ---------- + name + Name of flock to stop. + """ + flock = self._flocks.get(name) + if flock is None: + raise FlockNotFoundException(name) + del self._flocks[name] + await flock.stop() diff --git a/src/mobu/status.py b/src/mobu/status.py index bb84d1c3..aa6a00d5 100644 --- a/src/mobu/status.py +++ b/src/mobu/status.py @@ -2,12 +2,11 @@ from __future__ import annotations -import structlog from safir.slack.blockkit import SlackMessage -from safir.slack.webhook import SlackWebhookClient from .config import config -from .dependencies.manager import monkey_business_manager +from .dependencies.context import context_dependency +from .factory import Factory __all__ = ["post_status"] @@ -19,10 +18,13 @@ async def post_status() -> None: clear that mobu is alive, but a secondary benefit is to provide some summary statistics. """ - if not config.alert_hook or config.alert_hook == "None": + process_context = context_dependency.process_context + factory = Factory(process_context) + slack = factory.create_slack_webhook_client() + if not slack: return - summaries = monkey_business_manager.summarize_flocks() + summaries = process_context.manager.summarize_flocks() flock_count = len(summaries) flock_plural = "flock" if flock_count == 1 else "flocks" text = ( @@ -52,6 +54,4 @@ async def post_status() -> None: ) text += line - logger = structlog.get_logger("mobu") - slack = SlackWebhookClient(config.alert_hook, "Mobu", logger) await slack.post(SlackMessage(message=text)) diff --git a/tests/conftest.py b/tests/conftest.py index 4219cff3..71e7ecf8 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -22,6 +22,7 @@ from mobu.config import config from .support.cachemachine import MockCachemachine, mock_cachemachine +from .support.constants import TEST_BASE_URL from .support.gafaelfawr import make_gafaelfawr_token from .support.jupyter import ( MockJupyter, @@ -74,7 +75,7 @@ async def app( @pytest_asyncio.fixture async def client(app: FastAPI) -> AsyncIterator[AsyncClient]: """Return an ``httpx.AsyncClient`` configured to talk to the test app.""" - url = "https://example.com/" + url = TEST_BASE_URL headers = {"X-Auth-Request-User": "someuser"} async with AsyncClient(app=app, base_url=url, headers=headers) as client: yield client diff --git a/tests/handlers/flock_test.py b/tests/handlers/flock_test.py index a24cdc40..242d8457 100644 --- a/tests/handlers/flock_test.py +++ b/tests/handlers/flock_test.py @@ -9,6 +9,7 @@ from aioresponses import aioresponses from httpx import AsyncClient +from ..support.constants import TEST_BASE_URL from ..support.gafaelfawr import mock_gafaelfawr from ..support.util import wait_for_business @@ -56,7 +57,7 @@ async def test_start_stop( ], } assert r.json() == expected - assert r.headers["Location"] == "/mobu/flocks/test" + assert r.headers["Location"] == f"{TEST_BASE_URL}/mobu/flocks/test" await wait_for_business(client, "testuser1") r = await client.get("/mobu/flocks") @@ -192,7 +193,7 @@ async def test_user_list( ], } assert r.json() == expected - assert r.headers["Location"] == "/mobu/flocks/test" + assert r.headers["Location"] == f"{TEST_BASE_URL}/mobu/flocks/test" r = await client.get("/mobu/flocks/test/monkeys/testuser") assert r.status_code == 200 diff --git a/tests/status_test.py b/tests/status_test.py index ec948330..4eea463a 100644 --- a/tests/status_test.py +++ b/tests/status_test.py @@ -6,16 +6,19 @@ from unittest.mock import patch import pytest +from httpx import AsyncClient from safir.testing.slack import MockSlackWebhook -from mobu.dependencies.manager import monkey_business_manager from mobu.models.flock import FlockSummary +from mobu.services.manager import FlockManager from mobu.status import post_status @pytest.mark.asyncio -async def test_post_status(slack: MockSlackWebhook) -> None: - with patch.object(monkey_business_manager, "summarize_flocks") as mock: +async def test_post_status( + client: AsyncClient, slack: MockSlackWebhook +) -> None: + with patch.object(FlockManager, "summarize_flocks") as mock: mock.return_value = [ FlockSummary( name="notebook", diff --git a/tests/support/constants.py b/tests/support/constants.py new file mode 100644 index 00000000..69694516 --- /dev/null +++ b/tests/support/constants.py @@ -0,0 +1,4 @@ +"""Constants used in text fixtures and setup.""" + +TEST_BASE_URL = "https://example.com" +"""Base URL used for the test `httpx.AsyncClient`."""