Coverage for gws-app/gws/base/auth/user.py: 27%

142 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional, cast 

2 

3import gws 

4import gws.lib.jsonx 

5 

6 

7class Props(gws.Props): 

8 displayName: str 

9 attributes: dict 

10 

11 

12_FIELDS = { 

13 'authToken', 

14 'displayName', 

15 'email', 

16 'localUid', 

17 'loginName', 

18 'mfaSecret', 

19 'mfaUid', 

20} 

21 

22 

23class User(gws.User): 

24 isGuest = False 

25 

26 def __init__(self, provider, roles): 

27 super().__init__() 

28 

29 self.authProvider = provider 

30 

31 self.attributes = {} 

32 self.data = {} 

33 self.roles = roles 

34 self.uid = '' 

35 

36 for f in _FIELDS: 

37 setattr(self, f, '') 

38 

39 def props(self, user): 

40 return Props(displayName=self.displayName, attributes=self.attributes) 

41 

42 def has_role(self, role): 

43 return role in self.roles 

44 

45 def can_use(self, obj, *context): 

46 return self.can(gws.Access.read, obj, *context) 

47 

48 def can_read(self, obj, *context): 

49 return self.can(gws.Access.read, obj, *context) 

50 

51 def can_write(self, obj, *context): 

52 return self.can(gws.Access.write, obj, *context) 

53 

54 def can_create(self, obj, *context): 

55 return self.can(gws.Access.create, obj, *context) 

56 

57 def can_edit(self, obj, *context): 

58 return ( 

59 self.can(gws.Access.write, obj, *context) 

60 or self.can(gws.Access.create, obj, *context) 

61 or self.can(gws.Access.delete, obj, *context) 

62 ) 

63 

64 def can_delete(self, obj, *context): 

65 return self.can(gws.Access.delete, obj, *context) 

66 

67 def can(self, access, obj, *context): 

68 ci = 0 

69 clen = len(context) 

70 

71 while obj: 

72 bit = self.acl_bit(access, obj) 

73 if bit is not None: 

74 return bit == gws.c.ALLOW 

75 obj = context[ci] if ci < clen else getattr(obj, 'parent', None) 

76 ci += 1 

77 

78 return False 

79 

80 def acl_bit(self, access, obj): 

81 if obj is self and access == gws.Access.read: 

82 return gws.c.ALLOW 

83 acl = obj.permissions.get(access) 

84 if acl: 

85 for bit, role in acl: 

86 if role in self.roles: 

87 return bit 

88 

89 def require(self, uid=None, classref=None, access=None): 

90 access = access or gws.Access.read 

91 obj = self.authProvider.root.get(uid, classref) 

92 if not obj: 

93 raise gws.NotFoundError(f'required object {classref} {uid} not found') 

94 if not self.can(access, obj): 

95 raise gws.ForbiddenError(f'required object {classref} {uid} forbidden') 

96 return obj 

97 

98 def acquire(self, uid=None, classref=None, access=None): 

99 access = access or gws.Access.read 

100 obj = self.authProvider.root.get(uid, classref) 

101 if obj and self.can(access, obj): 

102 return obj 

103 

104 def require_project(self, uid=None): 

105 return cast(gws.Project, self.require(uid, gws.ext.object.project)) 

106 

107 def require_layer(self, uid=None): 

108 return cast(gws.Layer, self.require(uid, gws.ext.object.layer)) 

109 

110 

111class GuestUser(User): 

112 isGuest = True 

113 

114 

115class SystemUser(User): 

116 def acl_bit(self, access, obj): 

117 return gws.c.ALLOW 

118 

119 

120class NobodyUser(User): 

121 def acl_bit(self, access, obj): 

122 return gws.c.DENY 

123 

124 

125class AuthorizedUser(User): 

126 pass 

127 

128 

129class AdminUser(User): 

130 def acl_bit(self, access, obj): 

131 return gws.c.ALLOW 

132 

133 

134## 

135 

136 

137## 

138 

139def to_dict(usr) -> dict: 

140 d = {} 

141 

142 d['attributes'] = usr.attributes or {} 

143 d['data'] = usr.data or {} 

144 d['roles'] = list(usr.roles) 

145 d['uid'] = usr.uid 

146 

147 for f in _FIELDS: 

148 d[f] = getattr(usr, f, '') 

149 

150 return d 

151 

152 

153def from_dict(provider: gws.AuthProvider, d: dict) -> gws.User: 

154 roles = set(d.get('roles', [])) 

155 

156 if gws.c.ROLE_GUEST in roles: 

157 return provider.root.app.authMgr.guestUser 

158 

159 if gws.c.ROLE_ADMIN in roles: 

160 usr = AdminUser(provider, roles) 

161 else: 

162 usr = AuthorizedUser(provider, roles) 

163 

164 for f in _FIELDS: 

165 setattr(usr, f, d.get(f, '')) 

166 

167 usr.attributes = d.get('attributes', {}) 

168 usr.data = d.get('data', {}) 

169 usr.roles = roles 

170 usr.uid = gws.u.join_uid(provider.uid, usr.localUid) 

171 

172 return usr 

173 

174 

175def from_record(provider: gws.AuthProvider, user_rec: dict) -> gws.User: 

176 """Create a User from a raw record as returned from a provider. 

177 

178 A provider can return an arbitrary dict of values. Entries whose keys are 

179 in the `_FIELDS` list (case-insensitively), are copied to the newly 

180 created `User` object. 

181 

182 Entries ``roles`` and ``attributes`` are copied as well, 

183 other entries are stored in the user's ``data`` dict. 

184 """ 

185 

186 data = dict(user_rec) 

187 

188 roles = set(gws.u.to_list(data.pop('roles', []))) 

189 roles.add(gws.c.ROLE_ALL) 

190 

191 if gws.c.ROLE_GUEST in roles: 

192 return provider.root.app.authMgr.guestUser 

193 

194 if gws.c.ROLE_ADMIN in roles: 

195 usr = AdminUser(provider, roles) 

196 else: 

197 roles.add(gws.c.ROLE_USER) 

198 usr = AuthorizedUser(provider, roles) 

199 

200 for f in _FIELDS: 

201 if f in data: 

202 setattr(usr, f, data.pop(f)) 

203 continue 

204 if f.lower() in data: 

205 setattr(usr, f, data.pop(f.lower())) 

206 continue 

207 

208 usr.attributes = data.pop('attributes', {}) 

209 usr.data = _process_aliases(data) 

210 

211 if not usr.loginName and 'login' in usr.data: 

212 usr.loginName = usr.data['login'] 

213 

214 if not usr.email and 'email' in usr.data: 

215 usr.email = usr.data['email'] 

216 

217 usr.localUid = usr.localUid or usr.loginName 

218 if not usr.localUid: 

219 raise gws.Error(f'missing local uid for user') 

220 

221 usr.displayName = usr.displayName or usr.loginName 

222 

223 usr.uid = gws.u.join_uid(provider.uid, usr.localUid) 

224 

225 return usr 

226 

227 

228_ALIASES = [ 

229 # https://tools.ietf.org/html/rfc4519 

230 ('c', 'countryName'), 

231 ('cn', 'commonName'), 

232 ('dc', 'domainComponent'), 

233 ('l', 'localityName'), 

234 ('o', 'organizationName'), 

235 ('ou', 'organizationalUnitName'), 

236 ('sn', 'surname'), 

237 ('st', 'stateOrProvinceName'), 

238 ('street', 'streetAddress'), 

239 

240 # non-standard 

241 ('login', 'userPrincipalName'), 

242 ('mail', 'email'), 

243] 

244 

245 

246def _process_aliases(r): 

247 for a, b in _ALIASES: 

248 if a in r: 

249 r[b] = r[a] 

250 elif b in r: 

251 r[a] = r[b] 

252 return r