Coverage for gws-app/gws/plugin/account/helper.py: 0%

209 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional, cast 

2 

3import gws 

4import gws.base.edit.helper 

5import gws.config.util 

6import gws.plugin.email_helper 

7import gws.lib.image 

8import gws.lib.net 

9import gws.lib.otp 

10 

11from . import core 

12 

13gws.ext.new.helper('account') 

14 

15 

16class MfaConfig: 

17 mfaUid: str 

18 title: str 

19 

20 

21class Config(gws.Config): 

22 """Account helper. (added in 8.1)""" 

23 

24 adminModel: gws.ext.config.model 

25 """Edit model for account administration.""" 

26 userModel: Optional[gws.ext.config.model] 

27 """Edit model for end-users accounts.""" 

28 templates: list[gws.ext.config.template] 

29 """Templates""" 

30 

31 usernameColumn: str = 'email' 

32 """Column used as 'login'.""" 

33 

34 passwordCreateSql: Optional[str] 

35 """SQL expression for computing password hashes.""" 

36 passwordVerifySql: Optional[str] 

37 """SQL expression for verifying password hashes.""" 

38 

39 tcLifeTime: gws.Duration = 3600 

40 """Life time for temporary codes.""" 

41 

42 mfa: Optional[list[MfaConfig]] 

43 """Multi-factor authentication methods the user can choose from.""" 

44 mfaIssuer: str = '' 

45 """Issuer name for Multi-factor key uris (qr codes).""" 

46 

47 onboardingUrl: str 

48 """URL for email onboarding.""" 

49 onboardingCompletionUrl: str = '' 

50 """URL to redirect after onboarding.""" 

51 

52 

53## 

54 

55 

56class Error(gws.Error): 

57 """Account-related error.""" 

58 pass 

59 

60 

61## 

62 

63 

64class MfaOption(gws.Data): 

65 index: int 

66 title: str 

67 adapter: Optional[gws.AuthMultiFactorAdapter] 

68 

69 

70_DEFAULT_PASSWORD_CREATE_SQL = "crypt( {password}, gen_salt('bf') )" 

71_DEFAULT_PASSWORD_VERIFY_SQL = "crypt( {password}, {passwordColumn} )" 

72 

73 

74class Object(gws.base.edit.helper.Object): 

75 adminModel: gws.DatabaseModel 

76 userModel: gws.DatabaseModel 

77 templates: list[gws.Template] 

78 

79 mfaIssuer: str 

80 mfaOptions: list[MfaOption] 

81 onboardingUrl: str 

82 onboardingCompletionUrl: str 

83 passwordCreateSql: str 

84 passwordVerifySql: str 

85 tcLifeTime: int 

86 usernameColumn: str 

87 

88 def configure(self): 

89 self.configure_templates() 

90 

91 self.adminModel = cast(gws.DatabaseModel, self.create_child(gws.ext.object.model, self.cfg('adminModel'))) 

92 

93 self.mfaIssuer = self.cfg('mfaIssuer') 

94 self.mfaOptions = [] 

95 

96 self.onboardingUrl = self.cfg('onboardingUrl') 

97 self.onboardingCompletionUrl = self.cfg('onboardingCompletionUrl') or self.onboardingUrl 

98 

99 self.passwordCreateSql = self.cfg('passwordCreateSql', default=_DEFAULT_PASSWORD_CREATE_SQL) 

100 self.passwordVerifySql = self.cfg('passwordVerifySql', default=_DEFAULT_PASSWORD_VERIFY_SQL) 

101 

102 self.tcLifeTime = self.cfg('tcLifeTime', default=3600) 

103 

104 self.usernameColumn = self.cfg('usernameColumn', default=core.Columns.email) 

105 

106 def configure_templates(self): 

107 return gws.config.util.configure_templates_for(self) 

108 

109 def post_configure(self): 

110 for n, c in enumerate(self.cfg('mfa', default=[]), 1): 

111 opt = MfaOption(index=n, title=c.title) 

112 if c.mfaUid: 

113 opt.adapter = self.root.get(c.mfaUid) 

114 if not opt.adapter: 

115 raise gws.ConfigurationError(f'MFA Adapter not found {c.mfaUid=}') 

116 self.mfaOptions.append(opt) 

117 

118 ## 

119 

120 def get_models(self, req, p): 

121 return [self.adminModel] 

122 

123 def write_feature(self, req, p): 

124 is_new = p.feature.isNew 

125 f = super().write_feature(req, p) 

126 

127 if f and not f.errors and is_new: 

128 account = self.get_account_by_id(f.uid()) 

129 self.reset(account) 

130 

131 return f 

132 

133 ## 

134 

135 def get_account_by_id(self, uid: str) -> Optional[dict]: 

136 sql = f''' 

137 SELECT * FROM {self.adminModel.tableName} 

138 WHERE {self.adminModel.uidName}=:uid 

139 ''' 

140 rs = self.adminModel.db.select_text(sql, uid=uid) 

141 return rs[0] if rs else None 

142 

143 def get_account_by_credentials(self, credentials: gws.Data, expected_status: Optional[core.Status] = None) -> Optional[dict]: 

144 expr = self.passwordVerifySql 

145 expr = expr.replace('{password}', ':password') 

146 expr = expr.replace('{passwordColumn}', core.Columns.password) 

147 

148 username = credentials.get('username') 

149 password = credentials.get('password') 

150 

151 sql = f''' 

152 SELECT 

153 {self.adminModel.uidName}, 

154 ( {core.Columns.password} = {expr} ) AS validpassword, 

155 {core.Columns.status} 

156 FROM 

157 {self.adminModel.tableName} 

158 WHERE 

159 {self.usernameColumn} = :username 

160 ''' 

161 rs = self.adminModel.db.select_text(sql, username=username, password=password) 

162 

163 if not rs: 

164 gws.log.warning(f'get_account_by_credentials: {username=} not found') 

165 return 

166 

167 if len(rs) > 1: 

168 raise Error(f'get_account_by_credentials: multiple entries for {username=}') 

169 

170 r = rs[0] 

171 

172 if not r.get('validpassword'): 

173 raise Error(f'get_account_by_credentials: {username=} wrong password') 

174 

175 if expected_status: 

176 status = r.get(core.Columns.status) 

177 if status != expected_status: 

178 raise Error(f'get_account_by_credentials: {username=} wrong {status=} {expected_status=}') 

179 

180 return self.get_account_by_id(self.get_uid(r)) 

181 

182 def get_account_by_tc(self, tc: str, category: str, expected_status: Optional[core.Status] = None) -> Optional[dict]: 

183 sql = f''' 

184 SELECT 

185 {self.adminModel.uidName}, 

186 {core.Columns.tcTime}, 

187 {core.Columns.tcCategory}, 

188 {core.Columns.status} 

189 FROM 

190 {self.adminModel.tableName} 

191 WHERE 

192 {core.Columns.tc} = :tc 

193 ''' 

194 rs = self.adminModel.db.select_text(sql, tc=tc) 

195 

196 if not rs: 

197 gws.log.warning(f'get_account_by_tc: {tc=} not found') 

198 return 

199 

200 self.invalidate_tc(tc) 

201 

202 if len(rs) > 1: 

203 raise Error(f'get_account_by_tc: {tc=} multiple entries') 

204 

205 r = rs[0] 

206 

207 if r.get(core.Columns.tcCategory) != category: 

208 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong category') 

209 return 

210 

211 if gws.u.stime() - r.get(core.Columns.tcTime, 0) > self.tcLifeTime: 

212 gws.log.warning(f'get_account_by_tc: {category=} {tc=} expired') 

213 return 

214 

215 if expected_status: 

216 status = r.get(core.Columns.status) 

217 if status != expected_status: 

218 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong {status=} {expected_status=}') 

219 return 

220 

221 return self.get_account_by_id(self.get_uid(r)) 

222 

223 ## 

224 

225 def set_password(self, account: dict, password): 

226 expr = self.passwordCreateSql 

227 expr = expr.replace('{password}', ':password') 

228 expr = expr.replace('{passwordColumn}', core.Columns.password) 

229 

230 sql = f''' 

231 UPDATE {self.adminModel.tableName} 

232 SET 

233 {core.Columns.password} = {expr} 

234 WHERE 

235 {self.adminModel.uidName} = :uid 

236 ''' 

237 self.adminModel.db.execute_text(sql, password=password, uid=self.get_uid(account)) 

238 

239 def validate_password(self, password: str) -> bool: 

240 if len(password.strip()) == 0: 

241 return False 

242 # @TODO password complexity validation 

243 return True 

244 

245 ## 

246 

247 def set_mfa(self, account: dict, mfa_option_index: int): 

248 mfa_uid = None 

249 

250 for mo in self.mfa_options(account): 

251 if mo.index == mfa_option_index: 

252 mfa_uid = mo.adapter.uid if mo.adapter else '' 

253 break 

254 

255 if mfa_uid is None: 

256 raise Error(f'{mfa_option_index=} not found') 

257 

258 sql = f''' 

259 UPDATE {self.adminModel.tableName} 

260 SET 

261 {core.Columns.mfaUid} = :mfa_uid 

262 WHERE 

263 {self.adminModel.uidName} = :uid 

264 ''' 

265 self.adminModel.db.execute_text(sql, mfa_uid=mfa_uid, uid=self.get_uid(account)) 

266 

267 def mfa_options(self, account: dict) -> list[MfaOption]: 

268 # @TODO different options per account 

269 return self.mfaOptions 

270 

271 def generate_mfa_secret(self, account: dict) -> str: 

272 secret = gws.lib.otp.random_secret() 

273 

274 sql = f''' 

275 UPDATE {self.adminModel.tableName} 

276 SET 

277 {core.Columns.mfaSecret} = :secret 

278 WHERE 

279 {self.adminModel.uidName} = :uid 

280 ''' 

281 self.adminModel.db.execute_text(sql, secret=secret, uid=self.get_uid(account)) 

282 

283 return secret 

284 

285 def qr_code_for_mfa(self, account: dict, mo: MfaOption, secret: str) -> str: 

286 if not mo.adapter: 

287 return '' 

288 url = mo.adapter.key_uri(secret, self.mfaIssuer, account.get(self.usernameColumn)) 

289 if not url: 

290 return '' 

291 return gws.lib.image.qr_code(url).to_data_url() 

292 

293 ## 

294 

295 def set_status(self, account: dict, status: core.Status): 

296 sql = f''' 

297 UPDATE {self.adminModel.tableName} 

298 SET 

299 {core.Columns.status} = :status 

300 WHERE 

301 {self.adminModel.uidName} = :uid 

302 ''' 

303 self.adminModel.db.execute_text(sql, status=status, uid=self.get_uid(account)) 

304 

305 def reset(self, account: dict): 

306 sql = f''' 

307 UPDATE {self.adminModel.tableName} 

308 SET 

309 {core.Columns.status} = :status, 

310 {core.Columns.password} = '', 

311 {core.Columns.mfaSecret} = '' 

312 WHERE 

313 {self.adminModel.uidName} = :uid 

314 ''' 

315 self.adminModel.db.execute_text(sql, status=core.Status.new, uid=self.get_uid(account)) 

316 

317 if self.onboardingUrl: 

318 self.send_onboarding_email(account) 

319 

320 ## 

321 

322 def send_onboarding_email(self, account: dict): 

323 tc = self.generate_tc(account, core.Category.onboarding) 

324 url = gws.lib.net.add_params(self.onboardingUrl, onboarding=tc) 

325 self.send_mail(account, core.Category.onboarding, {'url': url}) 

326 

327 def generate_tc(self, account: dict, category: str) -> str: 

328 tc = self.make_tc() 

329 

330 sql = f''' 

331 UPDATE {self.adminModel.tableName} 

332 SET 

333 {core.Columns.tc} = :tc, 

334 {core.Columns.tcTime} = :time, 

335 {core.Columns.tcCategory} = :category 

336 WHERE 

337 {self.adminModel.uidName} = :uid 

338 ''' 

339 self.adminModel.db.execute_text(sql, tc=tc, time=gws.u.stime(), category=category, uid=self.get_uid(account)) 

340 

341 return tc 

342 

343 def clear_tc(self, account: dict): 

344 sql = f''' 

345 UPDATE {self.adminModel.tableName} 

346 SET 

347 {core.Columns.tc} = '', 

348 {core.Columns.tcTime} = 0, 

349 {core.Columns.tcCategory} = '' 

350 WHERE 

351 {self.adminModel.uidName} = :uid 

352 ''' 

353 self.adminModel.db.execute_text(sql, uid=self.get_uid(account)) 

354 

355 def invalidate_tc(self, tc: str): 

356 sql = f''' 

357 UPDATE {self.adminModel.tableName} 

358 SET 

359 {core.Columns.tc} = '', 

360 {core.Columns.tcTime} = 0, 

361 {core.Columns.tcCategory} = '' 

362 WHERE 

363 {core.Columns.tc} = :tc 

364 ''' 

365 self.adminModel.db.execute_text(sql, tc=tc) 

366 

367 ## 

368 

369 def get_uid(self, account: dict) -> str: 

370 return account.get(self.adminModel.uidName) 

371 

372 def make_tc(self): 

373 return gws.u.random_string(32) 

374 

375 def send_mail(self, account: dict, category: str, args: Optional[dict] = None): 

376 email = account.get(core.Columns.email) 

377 if not email: 

378 raise Error(f'account {self.get_uid(account)}: no email') 

379 

380 args = args or {} 

381 args['account'] = account 

382 

383 message = gws.plugin.email_helper.Message( 

384 subject=self.render_template(f'{category}.emailSubject', args), 

385 mailTo=email, 

386 text=self.render_template(f'{category}.emailBody', args, mime='text/plain'), 

387 html=self.render_template(f'{category}.emailBody', args, mime='text/html'), 

388 ) 

389 

390 email_helper = cast(gws.plugin.email_helper, self.root.app.helper('email')) 

391 email_helper.send_mail(message) 

392 

393 def render_template(self, subject, args, mime=None): 

394 tpl = self.root.app.templateMgr.find_template(subject, where=[self], mime=mime) 

395 if tpl: 

396 res = tpl.render(gws.TemplateRenderInput(args=args)) 

397 return res.content 

398 return ''