diff --git a/connexion/security.py b/connexion/security.py index ddfd14b16..e618f7ca9 100644 --- a/connexion/security.py +++ b/connexion/security.py @@ -67,6 +67,7 @@ class AbstractSecurityHandler: required_scopes_kw = "required_scopes" + request_kw = "request" client = None security_definition_key: str """The key which contains the value for the function name to resolve.""" @@ -106,12 +107,12 @@ def _get_function( return default def _generic_check(self, func, exception_msg): - need_to_add_required_scopes = self._need_to_add_scopes(func) - async def wrapper(request, *args, required_scopes=None): kwargs = {} - if need_to_add_required_scopes: + if self._accepts_kwarg(func, self.required_scopes_kw): kwargs[self.required_scopes_kw] = required_scopes + if self._accepts_kwarg(func, self.request_kw): + kwargs[self.request_kw] = request token_info = func(*args, **kwargs) while asyncio.iscoroutine(token_info): token_info = await token_info @@ -140,10 +141,11 @@ def get_auth_header_value(request): raise OAuthProblem(detail="Invalid authorization header") return auth_type.lower(), value - def _need_to_add_scopes(self, func): + @staticmethod + def _accepts_kwarg(func: t.Callable, keyword: str) -> bool: + """Check if the function accepts the provided keyword argument.""" arguments, has_kwargs = inspect_function_arguments(func) - need_required_scopes = has_kwargs or self.required_scopes_kw in arguments - return need_required_scopes + return has_kwargs or keyword in arguments def _resolve_func(self, security_scheme): """ diff --git a/docs/security.rst b/docs/security.rst index fac723a1d..df45c1007 100644 --- a/docs/security.rst +++ b/docs/security.rst @@ -70,6 +70,7 @@ The function should accept the following arguments: - username - password - required_scopes (optional) +- request (optional) You can find a `minimal Basic Auth example application`_ in Connexion's "examples" folder. @@ -85,6 +86,7 @@ The function should accept the following arguments: - token - required_scopes (optional) +- request (optional) You can find a `minimal Bearer example application`_ in Connexion's "examples" folder. @@ -100,6 +102,7 @@ The function should accept the following arguments: - apikey - required_scopes (optional) +- request (optional) You can find a `minimal API Key example application`_ in Connexion's "examples" folder. @@ -115,6 +118,7 @@ The function should accept the following arguments: - token - required_scopes (optional) +- request (optional) As alternative to an ``x-tokenInfoFunc`` definition, you can set an ``x-tokenInfoUrl`` definition or ``TOKENINFO_URL`` environment variable, and connexion will call the url instead of a local @@ -132,6 +136,7 @@ The function should accept the following arguments: - required_scopes - token_scopes +- request (optional) and return a boolean indicating if the validation was successful. diff --git a/tests/decorators/test_security.py b/tests/decorators/test_security.py index abb88eb01..2e1a99e7e 100644 --- a/tests/decorators/test_security.py +++ b/tests/decorators/test_security.py @@ -328,3 +328,52 @@ def test_raise_most_specific(errors, most_specific): security_handler_factory = SecurityHandlerFactory() with pytest.raises(most_specific): security_handler_factory._raise_most_specific(errors) + + +async def test_optional_kwargs_injected(): + """Test that optional keyword arguments 'required_scopes' and 'request' are injected when + defined as arguments in the user security function. This test uses the ApiKeySecurityHandler, + but the tested behavior is generic across handlers.""" + security_handler_factory = ApiKeySecurityHandler() + + request = ConnexionRequest( + scope={"type": "http", "headers": [[b"x-auth", b"foobar"]]} + ) + + def apikey_info_no_kwargs(key): + """Will fail if additional keywords are injected.""" + return {"sub": "no_kwargs"} + + wrapped_func_no_kwargs = security_handler_factory._get_verify_func( + apikey_info_no_kwargs, "header", "X-Auth" + ) + assert await wrapped_func_no_kwargs(request) == {"sub": "no_kwargs"} + + def apikey_info_request(key, request): + """Will fail if request is not injected.""" + return {"sub": "request"} + + wrapped_func_request = security_handler_factory._get_verify_func( + apikey_info_request, "header", "X-Auth" + ) + assert await wrapped_func_request(request) == {"sub": "request"} + + def apikey_info_scopes(key, required_scopes): + """Will fail if required_scopes is not injected.""" + return {"sub": "scopes"} + + wrapped_func_scopes = security_handler_factory._get_verify_func( + apikey_info_scopes, "header", "X-Auth" + ) + assert await wrapped_func_scopes(request) == {"sub": "scopes"} + + def apikey_info_kwargs(key, **kwargs): + """Will fail if request and required_scopes are not injected.""" + assert "request" in kwargs + assert "required_scopes" in kwargs + return {"sub": "kwargs"} + + wrapped_func_kwargs = security_handler_factory._get_verify_func( + apikey_info_kwargs, "header", "X-Auth" + ) + assert await wrapped_func_kwargs(request) == {"sub": "kwargs"}