Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add option to check allowed groups using the ldap connection of the search user #183 #185

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 61 additions & 26 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,19 +229,29 @@ def _server_port_default(self):
""",
)

def resolve_username(self, username_supplied_by_user):
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
conn = self.get_connection(
userdn=search_dn, password=self.lookup_dn_search_password
)
is_bound = conn.bind()
if not is_bound:
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warning(msg.format(search_dn=search_dn))
return (None, None)
use_search_user_to_check_groups = Bool(
False,
config=True,
help="""If set to true uses the ldap connection of the configured search user to check the `allowed_groups` for the
user to be authenticated.

By default the ldap connection of the user to be authenticated is used. This can be useful in ldap environments where
the users itself don't have the permission to access the ldap groups. The configured search user needs the
permission to access ldap groups though.
"""
)


def resolve_username(self, connection, username_supplied_by_user):
"""Resolves the user name and user dn of the user being authenticated.

Args:
connection: the ldap connection to use
username_supplied_by_user: the username supplied by the user

Returns:
tuple: value of ldap attribute `lookup_dn_user_dn_attribute` on user entry and the user dn
"""
search_filter = self.lookup_dn_search_filter.format(
login_attr=self.user_attribute, login=username_supplied_by_user
)
Expand All @@ -260,13 +270,13 @@ def resolve_username(self, username_supplied_by_user):
attributes=self.user_attribute,
)
)
conn.search(
connection.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
response = connection.response
if len(response) == 0 or "attributes" not in response[0].keys():
msg = (
"No entry found for user '{username}' "
Expand Down Expand Up @@ -357,8 +367,21 @@ def authenticate(self, handler, data):
)
return None

# connect to ldap with search user
search_dn = self.lookup_dn_search_user
if self.escape_userdn:
search_dn = escape_filter_chars(search_dn)
connection_search = self.get_connection(
userdn=search_dn, password=self.lookup_dn_search_password
)
if not connection_search.bind():
msg = "Failed to connect to LDAP server with search user '{search_dn}'"
self.log.warning(msg.format(search_dn=search_dn))
return None

# lookup dn of user to be authenticated
if self.lookup_dn:
username, resolved_dn = self.resolve_username(username)
username, resolved_dn = self.resolve_username(connection_search, username)
if not username:
return None
if str(self.lookup_dn_user_dn_attribute).upper() == "CN":
Expand All @@ -367,6 +390,7 @@ def authenticate(self, handler, data):
if not bind_dn_template:
bind_dn_template = [resolved_dn]

# try to authenticate user
is_bound = False
for dn in bind_dn_template:
if not dn:
Expand All @@ -379,15 +403,15 @@ def authenticate(self, handler, data):
self.log.debug(msg.format(username=username, userdn=userdn))
msg = "Status of user bind {username} with {userdn} : {is_bound}"
try:
conn = self.get_connection(userdn, password)
connection_user = self.get_connection(userdn, password)
except ldap3.core.exceptions.LDAPBindError as exc:
is_bound = False
msg += "\n{exc_type}: {exc_msg}".format(
exc_type=exc.__class__.__name__,
exc_msg=exc.args[0] if exc.args else "",
)
else:
is_bound = True if conn.bound else conn.bind()
is_bound = True if connection_user.bound else connection_user.bind()
msg = msg.format(username=username, userdn=userdn, is_bound=is_bound)
self.log.debug(msg)
if is_bound:
Expand All @@ -398,17 +422,18 @@ def authenticate(self, handler, data):
self.log.warning(msg.format(username=username))
return None

# validate user access by applying the search filter
if self.search_filter:
search_filter = self.search_filter.format(
userattr=self.user_attribute, username=username
)
conn.search(
connection_user.search(
search_base=self.user_search_base,
search_scope=ldap3.SUBTREE,
search_filter=search_filter,
attributes=self.attributes,
)
n_users = len(conn.response)
n_users = len(connection_user.response)
if n_users == 0:
msg = "User with '{userattr}={username}' not found in directory"
self.log.warning(
Expand All @@ -427,6 +452,7 @@ def authenticate(self, handler, data):
)
return None

# check if user is member in any of the allowed ldap groups
if self.allowed_groups:
self.log.debug("username:%s Using dn %s", username, userdn)
found = False
Expand All @@ -440,12 +466,21 @@ def authenticate(self, handler, data):
)
group_filter = group_filter.format(userdn=userdn, uid=username)
group_attributes = ["member", "uniqueMember", "memberUid"]
found = conn.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
# check which ldap connection to use: user (default) or search user
if self.use_search_user_to_check_groups is True:
found = connection_search.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
else:
found = connection_user.search(
group,
search_scope=ldap3.BASE,
search_filter=group_filter,
attributes=group_attributes,
)
if found:
break
if not found:
Expand All @@ -457,7 +492,7 @@ def authenticate(self, handler, data):
if not self.use_lookup_dn_username:
username = data["username"]

user_info = self.get_user_attributes(conn, userdn)
user_info = self.get_user_attributes(connection_user, userdn)
if user_info:
self.log.debug("username:%s attributes:%s", username, user_info)
return {"name": username, "auth_state": user_info}
Expand Down
126 changes: 126 additions & 0 deletions ldapauthenticator/tests/test_ldapconnection_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
"""Tests the usage of both ldap connections of the ldap authenticator.
"""
import os
from unittest.mock import MagicMock, call, ANY
import pytest

from ..ldapauthenticator import LDAPAuthenticator


@pytest.fixture
def authenticator_setup():
"""Provides a configured and mocked authenticator as well as a mocked search and user connection.

Note: don't be confused with the connection settings here, they just serve as valid configuration and are not used.
"""
# configure authenticator
authenticator = LDAPAuthenticator()
authenticator.server_address = "localhost"
authenticator.lookup_dn = False
authenticator.bind_dn_template = "cn={username},ou=people,dc=planetexpress,dc=com"
authenticator.user_search_base = "ou=people,dc=planetexpress,dc=com"
authenticator.user_attribute = "uid"
authenticator.lookup_dn_user_dn_attribute = "cn"
authenticator.escape_userdn = True
authenticator.attributes = ["uid", "cn", "mail", "ou"]
authenticator.use_lookup_dn_username = False
# leela is being authenticated, she's member of that group
authenticator.allowed_groups = [
"cn=ship_crew,ou=people,dc=planetexpress,dc=com",
]
# search user is 'hermes'
authenticator.lookup_dn_search_user = 'hermes'
authenticator.lookup_dn_search_password = 'hermes'

# mock ldap connections: return either the one for the search user or for the user to be authenticated
connection_search_mock = MagicMock()
connection_user_mock = MagicMock()
def connection_mock(*args, **kwargs):
if 'userdn' in kwargs and kwargs['userdn'] == 'hermes':
return connection_search_mock
else:
return connection_user_mock
authenticator.get_connection = MagicMock( side_effect = connection_mock )

# 1) search: bind method should return True
connection_search_mock.bind = MagicMock( return_value = True )

# 2) search: lookup dn of user to be authenticated » deactivated

# 3) user: bound should be False so that bind method is called returning True
connection_user_mock.bound = False
connection_user_mock.bind = MagicMock( return_value = True)

# 4) user: search filter are empty » deactivated

# 5) search or user: allowed groups » configured in test methods

# 6) user: get_user_attributes(connection, userdn) » returns dict with entry attributes
authenticator.get_user_attributes = MagicMock( return_value = { 'uid': 'leela', 'cn': 'Turanga Leela' } )

return authenticator, connection_search_mock, connection_user_mock


async def test_allowed_groups_check_with_user_connection(authenticator_setup):
"""Checks the method calls on both ldap connections when the `allowed_groups` are check with the
connection of the user being authenticated (default).
"""
# unpack + assert object setup
authenticator, connection_search_mock, connection_user_mock = authenticator_setup
assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None
assert authenticator.lookup_dn is False
assert not authenticator.search_filter

# assert default value
assert authenticator.use_search_user_to_check_groups is False

# 5) user: allowed groups » simply return True for the one group
connection_user_mock.search = MagicMock( return_value = True )

# authenticate leela
result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } )
assert 'name' in result
assert result['name'] == 'leela'

# assert method calls on mocks
expected_search_mock_calls = [
call.bind(),
]
assert connection_search_mock.mock_calls == expected_search_mock_calls
expected_user_mock_calls = [
call.bind(),
call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY)
]
assert connection_user_mock.mock_calls == expected_user_mock_calls

async def test_allowed_groups_check_with_search_connection(authenticator_setup):
"""Checks the method calls on both ldap connections when the `allowed_groups` are check with the
connection of the configured search user.
"""
# unpack + assert object setup
authenticator, connection_search_mock, connection_user_mock = authenticator_setup
assert authenticator is not None and connection_search_mock is not None and connection_user_mock is not None
assert authenticator.lookup_dn is False
assert not authenticator.search_filter

# enable allowed groups check using the search user connection
authenticator.use_search_user_to_check_groups = True

# 5) search: allowed groups » simply return True for the one group
connection_search_mock.search = MagicMock( return_value = True )

# authenticate leela
result = await authenticator.authenticate( None, { 'username': 'leela', 'password': 'leela' } )
assert 'name' in result
assert result['name'] == 'leela'

# assert method calls on mocks
expected_search_mock_calls = [
call.bind(),
call.search('cn=ship_crew,ou=people,dc=planetexpress,dc=com', search_scope = ANY, search_filter = ANY, attributes = ANY)
]
assert connection_search_mock.mock_calls == expected_search_mock_calls
expected_user_mock_calls = [
call.bind(),
]
assert connection_user_mock.mock_calls == expected_user_mock_calls