Coverage for gws-app/gws/plugin/auth_provider/ldap/__init__.py: 0%
155 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""LDAP authorization provider.
3Accepts an LDAP URL in the following form::
5 ldap://host:port/baseDN?searchAttribute
7which is a subset of the rfc2255 schema.
9Optionally, a bind dn and a password can be provided. This dn must have search permissions for the directory.
11The authorization workflow with the (login, password) credentials is as follows:
13- connect to the LDAP server, using the bind dn if provided
14- search for the dn matching ``searchAttribute = credentials.login``
15- attempt to login with that dn and ``credentials.password``
16- iterate the ``users`` configs to determine roles for the user
19References:
20 https://datatracker.ietf.org/doc/html/rfc2255
22"""
24from typing import Optional
26import contextlib
28import ldap
29import ldap.filter
31import gws
32import gws.base.auth
33import gws.lib.net
36gws.ext.new.authProvider('ldap')
39class UserSpec(gws.Data):
40 """Map LDAP filters to authorization roles"""
42 roles: list[str]
43 """GWS role names"""
44 matches: Optional[str]
45 """LDAP filter the account has to match"""
46 memberOf: Optional[str]
47 """LDAP group the account has to be a member of"""
50class Config(gws.base.auth.provider.Config):
51 """LDAP authorization provider"""
53 activeDirectory: bool = True
54 """True if the LDAP server is ActiveDirectory."""
55 bindDN: Optional[str]
56 """Bind DN."""
57 bindPassword: Optional[str]
58 """Bind password."""
59 displayNameFormat: Optional[gws.FormatStr]
60 """Format for user's display name."""
61 users: list[UserSpec]
62 """Map LDAP filters to gws roles."""
63 timeout: gws.Duration = 30
64 """LDAP server timeout."""
65 url: str
66 """LDAP server url."""
69class Object(gws.base.auth.provider.Object):
70 serverUrl: str
71 baseDN: str
72 loginAttribute: str
73 timeout: int
75 def configure(self):
76 p = gws.lib.net.parse_url(self.cfg('url'))
78 self.serverUrl = 'ldap://' + p.netloc
79 self.baseDN = p.path.strip('/')
80 self.loginAttribute = p.query
82 self.timeout = self.cfg('timeout', default=30)
84 try:
85 with self._connection():
86 gws.log.debug(f'LDAP connection {self.uid!r} ok')
87 except Exception as e:
88 raise gws.Error(f'LDAP connection error: {e.__class__.__name__}', *e.args)
90 def authenticate(self, method, credentials):
91 username = credentials.get('username')
92 password = credentials.get('password')
93 if not username or not password:
94 return
96 with self._connection() as conn:
97 users = self._find(conn, _make_filter({self.loginAttribute: username}))
99 if len(users) == 0:
100 return
101 if len(users) > 1:
102 raise gws.ForbiddenError(f'multiple entries for {username!r}')
104 rec = users[0]
106 # check for AD disabled accounts
107 uac = str(rec.get('userAccountControl', ''))
108 if uac and uac.isdigit():
109 if int(uac) & _MS_ACCOUNTDISABLE:
110 raise gws.ForbiddenError('ACCOUNTDISABLE flag set')
112 try:
113 conn.simple_bind_s(rec['dn'], password)
114 except ldap.INVALID_CREDENTIALS:
115 raise gws.ForbiddenError(f'wrong password for {username!r}')
116 except ldap.LDAPError as exc:
117 gws.log.exception()
118 raise gws.ForbiddenError(f'LDAP error {exc!r}')
120 return self._make_user(conn, rec)
122 def get_user(self, local_uid):
123 with self._connection() as conn:
124 users = self._find(conn, _make_filter({self.loginAttribute: local_uid}))
125 if len(users) == 1:
126 return self._make_user(conn, users[0])
128 ##
130 def _make_user(self, conn, rec):
131 user_rec = dict(rec)
132 user_rec['roles'] = self._roles_for_user(conn, rec)
134 if not user_rec.get('displayName') and self.cfg('displayNameFormat'):
135 user_rec['displayName'] = gws.u.format_map(self.cfg('displayNameFormat'), rec)
137 login = user_rec.pop(self.loginAttribute, '')
138 user_rec['localUid'] = user_rec['loginName'] = login
140 return gws.base.auth.user.from_record(self, user_rec)
142 def _roles_for_user(self, conn, rec):
143 user_dn = rec['dn']
144 roles = set()
146 for u in self.cfg('users'):
148 if u.get('matches'):
149 for dct in self._find(conn, u.matches):
150 if dct['dn'] == user_dn:
151 roles.update(u.roles)
153 elif u.get('memberOf'):
154 for dct in self._find(conn, u.memberOf):
155 if _is_member_of(dct, user_dn):
156 roles.update(u.roles)
158 return sorted(roles)
160 def _find(self, conn, flt):
161 res = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, flt)
162 dcts = []
164 for dn, data in res:
165 if dn:
166 d = _as_dict(data)
167 d['dn'] = dn
168 dcts.append(d)
170 return dcts
172 @contextlib.contextmanager
173 def _connection(self):
174 conn = ldap.initialize(self.serverUrl)
175 conn.set_option(ldap.OPT_NETWORK_TIMEOUT, self.timeout)
177 if self.cfg('activeDirectory'):
178 # see https://www.python-ldap.org/faq.html#usage
179 conn.set_option(ldap.OPT_REFERRALS, 0)
181 if self.cfg('bindDN'):
182 conn.simple_bind_s(self.cfg('bindDN'), self.cfg('bindPassword'))
184 try:
185 yield conn
186 finally:
187 conn.unbind_s()
190def _as_dict(data):
191 d = {}
193 for k, v in data.items():
194 if not v:
195 continue
196 if not isinstance(v, list):
197 v = [v]
198 v = [gws.u.to_str(s) for s in v]
199 d[k] = v[0] if len(v) == 1 else v
201 return d
204def _make_filter(filter_dict):
205 conds = ''.join(
206 '({}={})'.format(
207 ldap.filter.escape_filter_chars(k, 1),
208 ldap.filter.escape_filter_chars(v, 1)
209 )
210 for k, v in filter_dict.items()
211 )
212 return '(&' + conds + ')'
215def _is_member_of(group_dict, user_dn):
216 for key in 'member', 'members', 'uniqueMember':
217 if key in group_dict and user_dn in group_dict[key]:
218 return True
221# https://support.microsoft.com/en-us/help/305144
223_MS_SCRIPT = 0x0001
224_MS_ACCOUNTDISABLE = 0x0002
225_MS_HOMEDIR_REQUIRED = 0x0008
226_MS_LOCKOUT = 0x0010
227_MS_PASSWD_NOTREQD = 0x0020
228_MS_PASSWD_CANT_CHANGE = 0x0040
229_MS_ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080
230_MS_TEMP_DUPLICATE_ACCOUNT = 0x0100
231_MS_NORMAL_ACCOUNT = 0x0200
232_MS_INTERDOMAIN_TRUST_ACCOUNT = 0x0800
233_MS_WORKSTATION_TRUST_ACCOUNT = 0x1000
234_MS_SERVER_TRUST_ACCOUNT = 0x2000
235_MS_DONT_EXPIRE_PASSWORD = 0x10000
236_MS_MNS_LOGON_ACCOUNT = 0x20000
237_MS_SMARTCARD_REQUIRED = 0x40000
238_MS_TRUSTED_FOR_DELEGATION = 0x80000
239_MS_NOT_DELEGATED = 0x100000
240_MS_USE_DES_KEY_ONLY = 0x200000
241_MS_DONT_REQ_PREAUTH = 0x400000
242_MS_PASSWORD_EXPIRED = 0x800000
243_MS_TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000
244_MS_PARTIAL_SECRETS_ACCOUNT = 0x04000000