Coverage for gws-app/gws/plugin/auth_provider/ldap/__init__.py: 0%

155 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""LDAP authorization provider. 

2 

3Accepts an LDAP URL in the following form:: 

4 

5 ldap://host:port/baseDN?searchAttribute 

6 

7which is a subset of the rfc2255 schema. 

8 

9Optionally, a bind dn and a password can be provided. This dn must have search permissions for the directory. 

10 

11The authorization workflow with the (login, password) credentials is as follows: 

12 

13- connect to the LDAP server, using the bind dn if provided 

14- search for the dn matching ``searchAttribute = credentials.login`` 

15- attempt to login with that dn and ``credentials.password`` 

16- iterate the ``users`` configs to determine roles for the user 

17 

18 

19References: 

20 https://datatracker.ietf.org/doc/html/rfc2255 

21 

22""" 

23 

24from typing import Optional 

25 

26import contextlib 

27 

28import ldap 

29import ldap.filter 

30 

31import gws 

32import gws.base.auth 

33import gws.lib.net 

34 

35 

36gws.ext.new.authProvider('ldap') 

37 

38 

39class UserSpec(gws.Data): 

40 """Map LDAP filters to authorization roles""" 

41 

42 roles: list[str] 

43 """GWS role names""" 

44 matches: Optional[str] 

45 """LDAP filter the account has to match""" 

46 memberOf: Optional[str] 

47 """LDAP group the account has to be a member of""" 

48 

49 

50class Config(gws.base.auth.provider.Config): 

51 """LDAP authorization provider""" 

52 

53 activeDirectory: bool = True 

54 """True if the LDAP server is ActiveDirectory.""" 

55 bindDN: Optional[str] 

56 """Bind DN.""" 

57 bindPassword: Optional[str] 

58 """Bind password.""" 

59 displayNameFormat: Optional[gws.FormatStr] 

60 """Format for user's display name.""" 

61 users: list[UserSpec] 

62 """Map LDAP filters to gws roles.""" 

63 timeout: gws.Duration = 30 

64 """LDAP server timeout.""" 

65 url: str 

66 """LDAP server url.""" 

67 

68 

69class Object(gws.base.auth.provider.Object): 

70 serverUrl: str 

71 baseDN: str 

72 loginAttribute: str 

73 timeout: int 

74 

75 def configure(self): 

76 p = gws.lib.net.parse_url(self.cfg('url')) 

77 

78 self.serverUrl = 'ldap://' + p.netloc 

79 self.baseDN = p.path.strip('/') 

80 self.loginAttribute = p.query 

81 

82 self.timeout = self.cfg('timeout', default=30) 

83 

84 try: 

85 with self._connection(): 

86 gws.log.debug(f'LDAP connection {self.uid!r} ok') 

87 except Exception as e: 

88 raise gws.Error(f'LDAP connection error: {e.__class__.__name__}', *e.args) 

89 

90 def authenticate(self, method, credentials): 

91 username = credentials.get('username') 

92 password = credentials.get('password') 

93 if not username or not password: 

94 return 

95 

96 with self._connection() as conn: 

97 users = self._find(conn, _make_filter({self.loginAttribute: username})) 

98 

99 if len(users) == 0: 

100 return 

101 if len(users) > 1: 

102 raise gws.ForbiddenError(f'multiple entries for {username!r}') 

103 

104 rec = users[0] 

105 

106 # check for AD disabled accounts 

107 uac = str(rec.get('userAccountControl', '')) 

108 if uac and uac.isdigit(): 

109 if int(uac) & _MS_ACCOUNTDISABLE: 

110 raise gws.ForbiddenError('ACCOUNTDISABLE flag set') 

111 

112 try: 

113 conn.simple_bind_s(rec['dn'], password) 

114 except ldap.INVALID_CREDENTIALS: 

115 raise gws.ForbiddenError(f'wrong password for {username!r}') 

116 except ldap.LDAPError as exc: 

117 gws.log.exception() 

118 raise gws.ForbiddenError(f'LDAP error {exc!r}') 

119 

120 return self._make_user(conn, rec) 

121 

122 def get_user(self, local_uid): 

123 with self._connection() as conn: 

124 users = self._find(conn, _make_filter({self.loginAttribute: local_uid})) 

125 if len(users) == 1: 

126 return self._make_user(conn, users[0]) 

127 

128 ## 

129 

130 def _make_user(self, conn, rec): 

131 user_rec = dict(rec) 

132 user_rec['roles'] = self._roles_for_user(conn, rec) 

133 

134 if not user_rec.get('displayName') and self.cfg('displayNameFormat'): 

135 user_rec['displayName'] = gws.u.format_map(self.cfg('displayNameFormat'), rec) 

136 

137 login = user_rec.pop(self.loginAttribute, '') 

138 user_rec['localUid'] = user_rec['loginName'] = login 

139 

140 return gws.base.auth.user.from_record(self, user_rec) 

141 

142 def _roles_for_user(self, conn, rec): 

143 user_dn = rec['dn'] 

144 roles = set() 

145 

146 for u in self.cfg('users'): 

147 

148 if u.get('matches'): 

149 for dct in self._find(conn, u.matches): 

150 if dct['dn'] == user_dn: 

151 roles.update(u.roles) 

152 

153 elif u.get('memberOf'): 

154 for dct in self._find(conn, u.memberOf): 

155 if _is_member_of(dct, user_dn): 

156 roles.update(u.roles) 

157 

158 return sorted(roles) 

159 

160 def _find(self, conn, flt): 

161 res = conn.search_s(self.baseDN, ldap.SCOPE_SUBTREE, flt) 

162 dcts = [] 

163 

164 for dn, data in res: 

165 if dn: 

166 d = _as_dict(data) 

167 d['dn'] = dn 

168 dcts.append(d) 

169 

170 return dcts 

171 

172 @contextlib.contextmanager 

173 def _connection(self): 

174 conn = ldap.initialize(self.serverUrl) 

175 conn.set_option(ldap.OPT_NETWORK_TIMEOUT, self.timeout) 

176 

177 if self.cfg('activeDirectory'): 

178 # see https://www.python-ldap.org/faq.html#usage 

179 conn.set_option(ldap.OPT_REFERRALS, 0) 

180 

181 if self.cfg('bindDN'): 

182 conn.simple_bind_s(self.cfg('bindDN'), self.cfg('bindPassword')) 

183 

184 try: 

185 yield conn 

186 finally: 

187 conn.unbind_s() 

188 

189 

190def _as_dict(data): 

191 d = {} 

192 

193 for k, v in data.items(): 

194 if not v: 

195 continue 

196 if not isinstance(v, list): 

197 v = [v] 

198 v = [gws.u.to_str(s) for s in v] 

199 d[k] = v[0] if len(v) == 1 else v 

200 

201 return d 

202 

203 

204def _make_filter(filter_dict): 

205 conds = ''.join( 

206 '({}={})'.format( 

207 ldap.filter.escape_filter_chars(k, 1), 

208 ldap.filter.escape_filter_chars(v, 1) 

209 ) 

210 for k, v in filter_dict.items() 

211 ) 

212 return '(&' + conds + ')' 

213 

214 

215def _is_member_of(group_dict, user_dn): 

216 for key in 'member', 'members', 'uniqueMember': 

217 if key in group_dict and user_dn in group_dict[key]: 

218 return True 

219 

220 

221# https://support.microsoft.com/en-us/help/305144 

222 

223_MS_SCRIPT = 0x0001 

224_MS_ACCOUNTDISABLE = 0x0002 

225_MS_HOMEDIR_REQUIRED = 0x0008 

226_MS_LOCKOUT = 0x0010 

227_MS_PASSWD_NOTREQD = 0x0020 

228_MS_PASSWD_CANT_CHANGE = 0x0040 

229_MS_ENCRYPTED_TEXT_PWD_ALLOWED = 0x0080 

230_MS_TEMP_DUPLICATE_ACCOUNT = 0x0100 

231_MS_NORMAL_ACCOUNT = 0x0200 

232_MS_INTERDOMAIN_TRUST_ACCOUNT = 0x0800 

233_MS_WORKSTATION_TRUST_ACCOUNT = 0x1000 

234_MS_SERVER_TRUST_ACCOUNT = 0x2000 

235_MS_DONT_EXPIRE_PASSWORD = 0x10000 

236_MS_MNS_LOGON_ACCOUNT = 0x20000 

237_MS_SMARTCARD_REQUIRED = 0x40000 

238_MS_TRUSTED_FOR_DELEGATION = 0x80000 

239_MS_NOT_DELEGATED = 0x100000 

240_MS_USE_DES_KEY_ONLY = 0x200000 

241_MS_DONT_REQ_PREAUTH = 0x400000 

242_MS_PASSWORD_EXPIRED = 0x800000 

243_MS_TRUSTED_TO_AUTH_FOR_DELEGATION = 0x1000000 

244_MS_PARTIAL_SECRETS_ACCOUNT = 0x04000000