Coverage for gws-app/gws/plugin/account/helper.py: 0%
209 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional, cast
3import gws
4import gws.base.edit.helper
5import gws.config.util
6import gws.plugin.email_helper
7import gws.lib.image
8import gws.lib.net
9import gws.lib.otp
11from . import core
13gws.ext.new.helper('account')
16class MfaConfig:
17 mfaUid: str
18 title: str
21class Config(gws.Config):
22 """Account helper. (added in 8.1)"""
24 adminModel: gws.ext.config.model
25 """Edit model for account administration."""
26 userModel: Optional[gws.ext.config.model]
27 """Edit model for end-users accounts."""
28 templates: list[gws.ext.config.template]
29 """Templates"""
31 usernameColumn: str = 'email'
32 """Column used as 'login'."""
34 passwordCreateSql: Optional[str]
35 """SQL expression for computing password hashes."""
36 passwordVerifySql: Optional[str]
37 """SQL expression for verifying password hashes."""
39 tcLifeTime: gws.Duration = 3600
40 """Life time for temporary codes."""
42 mfa: Optional[list[MfaConfig]]
43 """Multi-factor authentication methods the user can choose from."""
44 mfaIssuer: str = ''
45 """Issuer name for Multi-factor key uris (qr codes)."""
47 onboardingUrl: str
48 """URL for email onboarding."""
49 onboardingCompletionUrl: str = ''
50 """URL to redirect after onboarding."""
53##
56class Error(gws.Error):
57 """Account-related error."""
58 pass
61##
64class MfaOption(gws.Data):
65 index: int
66 title: str
67 adapter: Optional[gws.AuthMultiFactorAdapter]
70_DEFAULT_PASSWORD_CREATE_SQL = "crypt( {password}, gen_salt('bf') )"
71_DEFAULT_PASSWORD_VERIFY_SQL = "crypt( {password}, {passwordColumn} )"
74class Object(gws.base.edit.helper.Object):
75 adminModel: gws.DatabaseModel
76 userModel: gws.DatabaseModel
77 templates: list[gws.Template]
79 mfaIssuer: str
80 mfaOptions: list[MfaOption]
81 onboardingUrl: str
82 onboardingCompletionUrl: str
83 passwordCreateSql: str
84 passwordVerifySql: str
85 tcLifeTime: int
86 usernameColumn: str
88 def configure(self):
89 self.configure_templates()
91 self.adminModel = cast(gws.DatabaseModel, self.create_child(gws.ext.object.model, self.cfg('adminModel')))
93 self.mfaIssuer = self.cfg('mfaIssuer')
94 self.mfaOptions = []
96 self.onboardingUrl = self.cfg('onboardingUrl')
97 self.onboardingCompletionUrl = self.cfg('onboardingCompletionUrl') or self.onboardingUrl
99 self.passwordCreateSql = self.cfg('passwordCreateSql', default=_DEFAULT_PASSWORD_CREATE_SQL)
100 self.passwordVerifySql = self.cfg('passwordVerifySql', default=_DEFAULT_PASSWORD_VERIFY_SQL)
102 self.tcLifeTime = self.cfg('tcLifeTime', default=3600)
104 self.usernameColumn = self.cfg('usernameColumn', default=core.Columns.email)
106 def configure_templates(self):
107 return gws.config.util.configure_templates_for(self)
109 def post_configure(self):
110 for n, c in enumerate(self.cfg('mfa', default=[]), 1):
111 opt = MfaOption(index=n, title=c.title)
112 if c.mfaUid:
113 opt.adapter = self.root.get(c.mfaUid)
114 if not opt.adapter:
115 raise gws.ConfigurationError(f'MFA Adapter not found {c.mfaUid=}')
116 self.mfaOptions.append(opt)
118 ##
120 def get_models(self, req, p):
121 return [self.adminModel]
123 def write_feature(self, req, p):
124 is_new = p.feature.isNew
125 f = super().write_feature(req, p)
127 if f and not f.errors and is_new:
128 account = self.get_account_by_id(f.uid())
129 self.reset(account)
131 return f
133 ##
135 def get_account_by_id(self, uid: str) -> Optional[dict]:
136 sql = f'''
137 SELECT * FROM {self.adminModel.tableName}
138 WHERE {self.adminModel.uidName}=:uid
139 '''
140 rs = self.adminModel.db.select_text(sql, uid=uid)
141 return rs[0] if rs else None
143 def get_account_by_credentials(self, credentials: gws.Data, expected_status: Optional[core.Status] = None) -> Optional[dict]:
144 expr = self.passwordVerifySql
145 expr = expr.replace('{password}', ':password')
146 expr = expr.replace('{passwordColumn}', core.Columns.password)
148 username = credentials.get('username')
149 password = credentials.get('password')
151 sql = f'''
152 SELECT
153 {self.adminModel.uidName},
154 ( {core.Columns.password} = {expr} ) AS validpassword,
155 {core.Columns.status}
156 FROM
157 {self.adminModel.tableName}
158 WHERE
159 {self.usernameColumn} = :username
160 '''
161 rs = self.adminModel.db.select_text(sql, username=username, password=password)
163 if not rs:
164 gws.log.warning(f'get_account_by_credentials: {username=} not found')
165 return
167 if len(rs) > 1:
168 raise Error(f'get_account_by_credentials: multiple entries for {username=}')
170 r = rs[0]
172 if not r.get('validpassword'):
173 raise Error(f'get_account_by_credentials: {username=} wrong password')
175 if expected_status:
176 status = r.get(core.Columns.status)
177 if status != expected_status:
178 raise Error(f'get_account_by_credentials: {username=} wrong {status=} {expected_status=}')
180 return self.get_account_by_id(self.get_uid(r))
182 def get_account_by_tc(self, tc: str, category: str, expected_status: Optional[core.Status] = None) -> Optional[dict]:
183 sql = f'''
184 SELECT
185 {self.adminModel.uidName},
186 {core.Columns.tcTime},
187 {core.Columns.tcCategory},
188 {core.Columns.status}
189 FROM
190 {self.adminModel.tableName}
191 WHERE
192 {core.Columns.tc} = :tc
193 '''
194 rs = self.adminModel.db.select_text(sql, tc=tc)
196 if not rs:
197 gws.log.warning(f'get_account_by_tc: {tc=} not found')
198 return
200 self.invalidate_tc(tc)
202 if len(rs) > 1:
203 raise Error(f'get_account_by_tc: {tc=} multiple entries')
205 r = rs[0]
207 if r.get(core.Columns.tcCategory) != category:
208 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong category')
209 return
211 if gws.u.stime() - r.get(core.Columns.tcTime, 0) > self.tcLifeTime:
212 gws.log.warning(f'get_account_by_tc: {category=} {tc=} expired')
213 return
215 if expected_status:
216 status = r.get(core.Columns.status)
217 if status != expected_status:
218 gws.log.warning(f'get_account_by_tc: {category=} {tc=} wrong {status=} {expected_status=}')
219 return
221 return self.get_account_by_id(self.get_uid(r))
223 ##
225 def set_password(self, account: dict, password):
226 expr = self.passwordCreateSql
227 expr = expr.replace('{password}', ':password')
228 expr = expr.replace('{passwordColumn}', core.Columns.password)
230 sql = f'''
231 UPDATE {self.adminModel.tableName}
232 SET
233 {core.Columns.password} = {expr}
234 WHERE
235 {self.adminModel.uidName} = :uid
236 '''
237 self.adminModel.db.execute_text(sql, password=password, uid=self.get_uid(account))
239 def validate_password(self, password: str) -> bool:
240 if len(password.strip()) == 0:
241 return False
242 # @TODO password complexity validation
243 return True
245 ##
247 def set_mfa(self, account: dict, mfa_option_index: int):
248 mfa_uid = None
250 for mo in self.mfa_options(account):
251 if mo.index == mfa_option_index:
252 mfa_uid = mo.adapter.uid if mo.adapter else ''
253 break
255 if mfa_uid is None:
256 raise Error(f'{mfa_option_index=} not found')
258 sql = f'''
259 UPDATE {self.adminModel.tableName}
260 SET
261 {core.Columns.mfaUid} = :mfa_uid
262 WHERE
263 {self.adminModel.uidName} = :uid
264 '''
265 self.adminModel.db.execute_text(sql, mfa_uid=mfa_uid, uid=self.get_uid(account))
267 def mfa_options(self, account: dict) -> list[MfaOption]:
268 # @TODO different options per account
269 return self.mfaOptions
271 def generate_mfa_secret(self, account: dict) -> str:
272 secret = gws.lib.otp.random_secret()
274 sql = f'''
275 UPDATE {self.adminModel.tableName}
276 SET
277 {core.Columns.mfaSecret} = :secret
278 WHERE
279 {self.adminModel.uidName} = :uid
280 '''
281 self.adminModel.db.execute_text(sql, secret=secret, uid=self.get_uid(account))
283 return secret
285 def qr_code_for_mfa(self, account: dict, mo: MfaOption, secret: str) -> str:
286 if not mo.adapter:
287 return ''
288 url = mo.adapter.key_uri(secret, self.mfaIssuer, account.get(self.usernameColumn))
289 if not url:
290 return ''
291 return gws.lib.image.qr_code(url).to_data_url()
293 ##
295 def set_status(self, account: dict, status: core.Status):
296 sql = f'''
297 UPDATE {self.adminModel.tableName}
298 SET
299 {core.Columns.status} = :status
300 WHERE
301 {self.adminModel.uidName} = :uid
302 '''
303 self.adminModel.db.execute_text(sql, status=status, uid=self.get_uid(account))
305 def reset(self, account: dict):
306 sql = f'''
307 UPDATE {self.adminModel.tableName}
308 SET
309 {core.Columns.status} = :status,
310 {core.Columns.password} = '',
311 {core.Columns.mfaSecret} = ''
312 WHERE
313 {self.adminModel.uidName} = :uid
314 '''
315 self.adminModel.db.execute_text(sql, status=core.Status.new, uid=self.get_uid(account))
317 if self.onboardingUrl:
318 self.send_onboarding_email(account)
320 ##
322 def send_onboarding_email(self, account: dict):
323 tc = self.generate_tc(account, core.Category.onboarding)
324 url = gws.lib.net.add_params(self.onboardingUrl, onboarding=tc)
325 self.send_mail(account, core.Category.onboarding, {'url': url})
327 def generate_tc(self, account: dict, category: str) -> str:
328 tc = self.make_tc()
330 sql = f'''
331 UPDATE {self.adminModel.tableName}
332 SET
333 {core.Columns.tc} = :tc,
334 {core.Columns.tcTime} = :time,
335 {core.Columns.tcCategory} = :category
336 WHERE
337 {self.adminModel.uidName} = :uid
338 '''
339 self.adminModel.db.execute_text(sql, tc=tc, time=gws.u.stime(), category=category, uid=self.get_uid(account))
341 return tc
343 def clear_tc(self, account: dict):
344 sql = f'''
345 UPDATE {self.adminModel.tableName}
346 SET
347 {core.Columns.tc} = '',
348 {core.Columns.tcTime} = 0,
349 {core.Columns.tcCategory} = ''
350 WHERE
351 {self.adminModel.uidName} = :uid
352 '''
353 self.adminModel.db.execute_text(sql, uid=self.get_uid(account))
355 def invalidate_tc(self, tc: str):
356 sql = f'''
357 UPDATE {self.adminModel.tableName}
358 SET
359 {core.Columns.tc} = '',
360 {core.Columns.tcTime} = 0,
361 {core.Columns.tcCategory} = ''
362 WHERE
363 {core.Columns.tc} = :tc
364 '''
365 self.adminModel.db.execute_text(sql, tc=tc)
367 ##
369 def get_uid(self, account: dict) -> str:
370 return account.get(self.adminModel.uidName)
372 def make_tc(self):
373 return gws.u.random_string(32)
375 def send_mail(self, account: dict, category: str, args: Optional[dict] = None):
376 email = account.get(core.Columns.email)
377 if not email:
378 raise Error(f'account {self.get_uid(account)}: no email')
380 args = args or {}
381 args['account'] = account
383 message = gws.plugin.email_helper.Message(
384 subject=self.render_template(f'{category}.emailSubject', args),
385 mailTo=email,
386 text=self.render_template(f'{category}.emailBody', args, mime='text/plain'),
387 html=self.render_template(f'{category}.emailBody', args, mime='text/html'),
388 )
390 email_helper = cast(gws.plugin.email_helper, self.root.app.helper('email'))
391 email_helper.send_mail(message)
393 def render_template(self, subject, args, mime=None):
394 tpl = self.root.app.templateMgr.find_template(subject, where=[self], mime=mime)
395 if tpl:
396 res = tpl.render(gws.TemplateRenderInput(args=args))
397 return res.content
398 return ''