Skip to content

Commit

Permalink
Merge pull request #56 from livestreamx/arogov
Browse files Browse the repository at this point in the history
mark dai galku (ldap -> ldap3)
  • Loading branch information
artamaney authored Mar 6, 2025
2 parents 9136680 + 82bcadf commit dd232b7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 98 deletions.
2 changes: 1 addition & 1 deletion makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ SPHINXAPIDOC_OPTS = -f -d 3 --ext-autodoc
COV_BADGE_SVG = $(DOCS_IMAGES_DIR)/coverage.svg
MYPY_CACHE_DIR = .mypy_cache

MIN_COVERAGE = 89.1
MIN_COVERAGE = 88.9
PYTHON_VERSION ?= 3.12

JOBS ?= 4
Expand Down
86 changes: 55 additions & 31 deletions overhave/transport/ldap/authenticator.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,78 @@
import logging
import re
import typing

import ldap
from ldap.ldapobject import LDAPObject
import ldap3
from ldap3.core.exceptions import LDAPExceptionError
from pydantic import SecretStr

from overhave.transport.ldap.settings import OverhaveLdapClientSettings

logger = logging.getLogger(__name__)


class LdapUserSearchAttributes(typing.TypedDict):
"""LDAP user search attributes."""

memberOf: list[str] # noqa: N815


class LDAPAuthenticator:
"""Class-client for LDAP authentication."""
"""Overhave LDAP client."""

def __init__(self, settings: OverhaveLdapClientSettings) -> None:
self._server = ldap3.Server(
settings.url,
use_ssl=True,
connect_timeout=settings.timeout.seconds,
)
self._settings = settings
self._ldap_connection: LDAPObject | None = None

def _connect(self, login: str, password: SecretStr) -> None:
ldap_connection = ldap.initialize(self._settings.url)
ldap_connection.set_option(ldap.OPT_REFERRALS, 0)
ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, self._settings.timeout.seconds)
if self._settings.tls_enabled:
ldap_connection.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_HARD)
ldap_connection.start_tls_s()
ldap_connection.simple_bind_s(f"{self._settings.domain}{login}", password.get_secret_value())
self._ldap_connection = ldap_connection

def _ask_ad_usergroups(self, login: str) -> list[str]:
result = self._ldap_connection.search_st( # type: ignore
base=self._settings.dn,
scope=ldap.SCOPE_SUBTREE,
filterstr=f"(sAMAccountName={login})",
attrlist=["memberOf"],
timeout=self._settings.timeout.seconds,

def _get_connection(self, login: str, password: SecretStr) -> ldap3.Connection:
return ldap3.Connection(
self._server,
user=f"{self._settings.domain}{login}",
password=password.get_secret_value(),
receive_timeout=self._settings.timeout.seconds,
read_only=True,
auto_referrals=False,
auto_bind=True,
)

def _ask_ad_usergroups(self, connection: ldap3.Connection, login: str) -> list[str]:
# Выполнение поискового запроса
connection.search(
search_base=self._settings.dn,
search_filter=f"(sAMAccountName={login})",
search_scope=ldap3.SUBTREE,
attributes=["memberOf"],
time_limit=self._settings.timeout.seconds,
)
member_of = result[0][1]["memberOf"]
member_of = [m.decode("utf8") for m in member_of]
if connection.result.get("result", 1) != 0:
return []
if connection.response is None: # pragma: no cover
# should not happen - result was 0
raise ValueError

members_of: list[LdapUserSearchAttributes] = [
el["attributes"].get("memberOf", []) for el in connection.response if "attributes" in el
]

if len(members_of) != 1:
raise ValueError("Multiple or zero users with the same login")

member_of = members_of[0]

p = re.compile("CN=(.*?),", re.IGNORECASE)
return [
p.match(x).group(1) # type: ignore
p.match(x).group(1) # type: ignore[union-attr]
for x in list(filter(lambda x: "OU=Security Groups" in x or "OU=Mail Groups" in x, member_of))
]

def get_user_groups(self, login: str, password: SecretStr) -> list[str] | None:
try:
self._connect(login, password)
except ldap.INVALID_CREDENTIALS:
logger.info("Failed LDAP auth_managers for user: %s", login)
return None

return self._ask_ad_usergroups(login)
with self._get_connection(login, password) as conn:
return self._ask_ad_usergroups(conn, login)
except LDAPExceptionError:
logger.debug("Failed LDAP authorization for user: %s", login)
return None
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "overhave"
version = "5.2.1"
version = "5.2.2"
description = "Overhave - web-framework for BDD"
readme = "README.rst"
authors = [
Expand Down
51 changes: 0 additions & 51 deletions tests/unit/authorization/conftest.py

This file was deleted.

14 changes: 0 additions & 14 deletions tests/unit/authorization/test_ldap.py

This file was deleted.

0 comments on commit dd232b7

Please sign in to comment.