From bc90bde8f808039fa05366cb023f5c682c3d0bbc Mon Sep 17 00:00:00 2001 From: Josue-T Date: Thu, 21 Mar 2024 19:10:28 +0100 Subject: [PATCH] Add support of ldap filter with anonymous user (#186) --- ldap_auth_provider.py | 50 ++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/ldap_auth_provider.py b/ldap_auth_provider.py index 3646948..25dd81b 100644 --- a/ldap_auth_provider.py +++ b/ldap_auth_provider.py @@ -375,7 +375,9 @@ def parse_config(config) -> "_LdapConfig": ldap_config = _LdapConfig( enabled=config.get("enabled", False), - mode=LDAPMode.SIMPLE, + mode=LDAPMode.SEARCH + if config.get("mode", "simple") == "search" + else LDAPMode.SIMPLE, uri=config["uri"], start_tls=config.get("start_tls", False), tls_options=config.get("tls_options"), @@ -403,6 +405,8 @@ def parse_config(config) -> "_LdapConfig": raise ValueError( "Either bind_password or bind_password_file must be set!" ) + + if ldap_config.mode == LDAPMode.SEARCH: ldap_config.filter = config.get("filter", None) # verify attribute lookup @@ -461,13 +465,16 @@ async def _fetch_root_domain(self) -> str: server = self._get_server(get_info=ldap3.DSA) if self.ldap_bind_dn is None or self.ldap_bind_password is None: - raise ValueError("Missing bind DN or bind password") - - result, conn = await self._ldap_simple_bind( - server=server, - bind_dn=self.ldap_bind_dn, - password=self.ldap_bind_password, - ) + result, conn = await self._ldap_simple_bind( + server=server, + auth_type=ldap3.ANONYMOUS, + ) + else: + result, conn = await self._ldap_simple_bind( + server=server, + bind_dn=self.ldap_bind_dn, + password=self.ldap_bind_password, + ) if not result: logger.warning("Unable to get root domain due to failed LDAP bind") @@ -503,7 +510,11 @@ async def _fetch_root_domain(self) -> str: return self.ldap_root_domain async def _ldap_simple_bind( - self, server: ldap3.ServerPool, bind_dn: str, password: str + self, + server: ldap3.ServerPool, + bind_dn: Optional[str] = None, + password: Optional[str] = None, + auth_type: str = ldap3.SIMPLE, ) -> Tuple[bool, Optional[ldap3.Connection]]: """Attempt a simple bind with the credentials given by the user against the LDAP server. @@ -513,6 +524,8 @@ async def _ldap_simple_bind( Returns False, None if an error occured """ + if (bind_dn is None or password is None) and auth_type == ldap3.SIMPLE: + raise ValueError("Missing bind DN or bind password") try: # bind with the the local user's ldap credentials @@ -521,7 +534,7 @@ async def _ldap_simple_bind( server, bind_dn, password, - authentication=ldap3.SIMPLE, + authentication=auth_type, read_only=True, ) logger.debug("Established LDAP connection in simple bind mode: %s", conn) @@ -578,13 +591,16 @@ async def _ldap_authenticated_search( try: if self.ldap_bind_dn is None or self.ldap_bind_password is None: - raise ValueError("Missing bind DN or bind password") - - result, conn = await self._ldap_simple_bind( - server=server, - bind_dn=self.ldap_bind_dn, - password=self.ldap_bind_password, - ) + result, conn = await self._ldap_simple_bind( + server=server, + auth_type=ldap3.ANONYMOUS, + ) + else: + result, conn = await self._ldap_simple_bind( + server=server, + bind_dn=self.ldap_bind_dn, + password=self.ldap_bind_password, + ) if not result: return (False, None, None)