Coverage for gws-app/gws/base/auth/mfa.py: 43%
75 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Generic multi-factor authentication adapter.
3Multi-factor authentication (handled in ``gws.plugin.auth_method.web.core`)
4is used for ``User`` object that provide the attribute ``mfaUid``,
5which is supposed to be an ID of a configured MFA Adapter.
7Specific MFA Adapters can require other attributes.
9Multi-factor authentication starts by creating a `gws.AuthMultiFactorTransaction` object,
10kept in a session until it is verified or expires.
12Some Adapters can be restarted (e.g. by resending a verification email).
13"""
15from typing import Optional
17import gws
18import gws.lib.otp
19import gws.lib.net
22class OtpConfig:
23 start: Optional[int]
24 step: Optional[int]
25 length: Optional[int]
26 tolerance: Optional[int]
27 algo: Optional[str]
30class Config(gws.Config):
31 """Multi-factor authorization configuration."""
33 message: str = ''
34 """Message to display in the client."""
35 lifeTime: Optional[gws.Duration] = 120
36 """How long to wait for the MFA to complete."""
37 maxVerifyAttempts: int = 3
38 """Max verify attempts."""
39 maxRestarts: int = 0
40 """Max code regeneration attempts."""
41 otp: Optional[OtpConfig]
42 """OTP generation options"""
45class Object(gws.AuthMultiFactorAdapter):
46 otpOptions: gws.lib.otp.Options
48 def configure(self):
49 self.message = self.cfg('message', default='')
50 self.lifeTime = self.cfg('lifeTime', default=120)
51 self.maxVerifyAttempts = self.cfg('maxVerifyAttempts', default=3)
52 self.maxRestarts = self.cfg('maxRestarts', default=0)
53 self.otpOptions = gws.u.merge(gws.lib.otp.DEFAULTS, self.cfg('otp'))
55 def start(self, user):
56 return gws.AuthMultiFactorTransaction(
57 state=gws.AuthMultiFactorState.open,
58 restartCount=0,
59 verifyCount=0,
60 secret='',
61 startTime=self.current_timestamp(),
62 generateTime=0,
63 message=self.message,
64 adapter=self,
65 user=user,
66 )
68 def check_state(self, mfa):
69 ts = self.current_timestamp()
70 if ts - mfa.startTime >= self.lifeTime:
71 mfa.state = gws.AuthMultiFactorState.failed
72 return False
73 if mfa.verifyCount > self.maxVerifyAttempts:
74 mfa.state = gws.AuthMultiFactorState.failed
75 return False
76 if mfa.state == gws.AuthMultiFactorState.failed:
77 return False
78 return True
80 def check_restart(self, mfa):
81 return mfa.restartCount < self.maxRestarts
83 def restart(self, mfa):
84 rc = mfa.restartCount + 1
85 if rc > self.maxRestarts:
86 return
88 mfa = self.start(mfa.user)
89 if not mfa:
90 return
92 mfa.restartCount = rc
93 return mfa
95 ##
97 def verify_attempt(self, mfa, payload_valid: bool):
98 mfa.verifyCount += 1
100 if not self.check_state(mfa):
101 return mfa
103 if payload_valid:
104 mfa.state = gws.AuthMultiFactorState.ok
105 return mfa
107 if mfa.verifyCount >= self.maxVerifyAttempts:
108 mfa.state = gws.AuthMultiFactorState.failed
109 return mfa
111 mfa.state = gws.AuthMultiFactorState.retry
112 return mfa
114 def generate_totp(self, mfa: gws.AuthMultiFactorTransaction) -> str:
115 ts = self.current_timestamp()
116 totp = gws.lib.otp.new_totp(mfa.secret, ts, self.otpOptions)
117 mfa.generateTime = ts
118 gws.log.debug(f'generate_totp {ts=} {totp=} {mfa.generateTime=}')
119 return totp
121 def check_totp(self, mfa: gws.AuthMultiFactorTransaction, input: str) -> bool:
122 return gws.lib.otp.check_totp(
123 str(input or ''),
124 mfa.secret,
125 self.current_timestamp(),
126 self.otpOptions,
127 )
129 def current_timestamp(self):
130 return gws.u.stime()