diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 02a918f..5991727 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -4,7 +4,7 @@ repos: hooks: - id: reorder-python-imports - repo: https://github.com/ambv/black - rev: 19.10b0 + rev: 22.3.0 hooks: - id: black - repo: https://github.com/pre-commit/pre-commit-hooks diff --git a/README.md b/README.md index 626b633..b51e245 100644 --- a/README.md +++ b/README.md @@ -218,6 +218,11 @@ If set to True (the default) the username used to build the DN string is returne When authenticating on a Linux machine against an AD server this might return something different from the supplied UNIX username. In this case setting this option to False might be a solution. +#### `LDAPAuthenticator.enable_refresh` #### +If set to True it then periodically checks if a user is still in one the allowed groups. +This requires `lookup_dn_search_user` and `lookup_dn_search_user` to be set if anonymous login is not allowed. +The refresh interval can be set with `c.Authenticator.auth_refresh_age`. + ## Compatibility ## This has been tested against an OpenLDAP server, with the client diff --git a/dev-requirements.txt b/dev-requirements.txt index f225f7a..3c0ce9a 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -1,5 +1,5 @@ beautifulsoup4 -codecov +codecov==2.1.13 coverage cryptography html5lib # needed for beautifulsoup diff --git a/ldapauthenticator/ldapauthenticator.py b/ldapauthenticator/ldapauthenticator.py index 4e3a809..b6f7da0 100644 --- a/ldapauthenticator/ldapauthenticator.py +++ b/ldapauthenticator/ldapauthenticator.py @@ -229,6 +229,17 @@ def _server_port_default(self): """, ) + enable_refresh = Bool( + False, + config=True, + help=""" + If set to true periodically checks if a user is still in one the allowed groups. + + This requires `lookup_dn_search_user` and `lookup_dn_search_user` to be set if anonymous login is not allowed. + The refresh interval can be set with `c.Authenticator.auth_refresh_age`. + """, + ) + def resolve_username(self, username_supplied_by_user): search_dn = self.lookup_dn_search_user if self.escape_userdn: @@ -463,6 +474,62 @@ def authenticate(self, handler, data): return {"name": username, "auth_state": user_info} return username + async def refresh_user(self, user, handler=None): + username = user.name + if self.enable_refresh and self.allowed_groups: + bind_dn_template = self.bind_dn_template + if isinstance(bind_dn_template, str): + bind_dn_template = [bind_dn_template] + + if self.lookup_dn: + username, resolved_dn = self.resolve_username(username) + if not username: + return None + if str(self.lookup_dn_user_dn_attribute).upper() == "CN": + # Only escape commas if the lookup attribute is CN + username = re.subn(r"([^\\]),", r"\1\,", username)[0] + if not bind_dn_template: + bind_dn_template = [resolved_dn] + + conn = self.get_connection( + userdn=self.lookup_dn_search_user, + password=self.lookup_dn_search_password, + ) + found = False + for dn in bind_dn_template: + if not dn: + self.log.warning("Ignoring blank 'bind_dn_template' entry!") + continue + userdn = dn.format(username=username) + self.log.debug("username:%s Using dn %s", username, userdn) + + for group in self.allowed_groups: + group_filter = ( + "(|" + "(member={userdn})" + "(uniqueMember={userdn})" + "(memberUid={uid})" + ")" + ) + group_filter = group_filter.format(userdn=userdn, uid=username) + group_attributes = ["member", "uniqueMember", "memberUid"] + found = conn.search( + group, + search_scope=ldap3.BASE, + search_filter=group_filter, + attributes=group_attributes, + ) + if found: + return True + if not found: + # If we reach here, then none of the groups matched + msg = "user:{userdn} User not in any of the allowed groups" + self.log.warning(msg.format(userdn=userdn)) + return False + return False + + return True + if __name__ == "__main__": import getpass diff --git a/ldapauthenticator/tests/test_ldapauthenticator.py b/ldapauthenticator/tests/test_ldapauthenticator.py index 6471213..e54941a 100644 --- a/ldapauthenticator/tests/test_ldapauthenticator.py +++ b/ldapauthenticator/tests/test_ldapauthenticator.py @@ -1,4 +1,5 @@ # Inspired by https://github.com/jupyterhub/jupyterhub/blob/master/jupyterhub/tests/test_auth.py +from unittest.mock import Mock async def test_ldap_auth_allowed(authenticator): @@ -100,3 +101,45 @@ async def test_ldap_auth_state_attributes(authenticator): ) assert authorized["name"] == "fry" assert authorized["auth_state"] == {"employeeType": ["Delivery boy"]} + + +async def test_ldap_refresh_user(authenticator): + authenticator.allowed_groups = [ + "cn=admin_staff,ou=people,dc=planetexpress,dc=com", + "cn=ship_crew,ou=people,dc=planetexpress,dc=com", + ] + + authenticator.enable_refresh = True + mock = Mock() + attrs = {"name": "zoidberg"} + mock.configure_mock(**attrs) + + is_valid = await authenticator.refresh_user(mock, None) + assert is_valid == False + + attrs = {"name": "leela"} + mock.configure_mock(**attrs) + + is_valid = await authenticator.refresh_user(mock, None) + assert is_valid == True + + +async def test_ldap_refresh_user_disabled(authenticator): + authenticator.allowed_groups = [ + "cn=admin_staff,ou=people,dc=planetexpress,dc=com", + "cn=ship_crew,ou=people,dc=planetexpress,dc=com", + ] + + authenticator.enable_refresh = False + mock = Mock() + attrs = {"name": "zoidberg"} + mock.configure_mock(**attrs) + + is_valid = await authenticator.refresh_user(mock, None) + assert is_valid == True + + attrs = {"name": "leela"} + mock.configure_mock(**attrs) + + is_valid = await authenticator.refresh_user(mock, None) + assert is_valid == True