Coverage for gws-app/gws/base/auth/mfa.py: 43%

75 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Generic multi-factor authentication adapter. 

2 

3Multi-factor authentication (handled in ``gws.plugin.auth_method.web.core`) 

4is used for ``User`` object that provide the attribute ``mfaUid``, 

5which is supposed to be an ID of a configured MFA Adapter. 

6 

7Specific MFA Adapters can require other attributes. 

8 

9Multi-factor authentication starts by creating a `gws.AuthMultiFactorTransaction` object, 

10kept in a session until it is verified or expires. 

11 

12Some Adapters can be restarted (e.g. by resending a verification email). 

13""" 

14 

15from typing import Optional 

16 

17import gws 

18import gws.lib.otp 

19import gws.lib.net 

20 

21 

22class OtpConfig: 

23 start: Optional[int] 

24 step: Optional[int] 

25 length: Optional[int] 

26 tolerance: Optional[int] 

27 algo: Optional[str] 

28 

29 

30class Config(gws.Config): 

31 """Multi-factor authorization configuration.""" 

32 

33 message: str = '' 

34 """Message to display in the client.""" 

35 lifeTime: Optional[gws.Duration] = 120 

36 """How long to wait for the MFA to complete.""" 

37 maxVerifyAttempts: int = 3 

38 """Max verify attempts.""" 

39 maxRestarts: int = 0 

40 """Max code regeneration attempts.""" 

41 otp: Optional[OtpConfig] 

42 """OTP generation options""" 

43 

44 

45class Object(gws.AuthMultiFactorAdapter): 

46 otpOptions: gws.lib.otp.Options 

47 

48 def configure(self): 

49 self.message = self.cfg('message', default='') 

50 self.lifeTime = self.cfg('lifeTime', default=120) 

51 self.maxVerifyAttempts = self.cfg('maxVerifyAttempts', default=3) 

52 self.maxRestarts = self.cfg('maxRestarts', default=0) 

53 self.otpOptions = gws.u.merge(gws.lib.otp.DEFAULTS, self.cfg('otp')) 

54 

55 def start(self, user): 

56 return gws.AuthMultiFactorTransaction( 

57 state=gws.AuthMultiFactorState.open, 

58 restartCount=0, 

59 verifyCount=0, 

60 secret='', 

61 startTime=self.current_timestamp(), 

62 generateTime=0, 

63 message=self.message, 

64 adapter=self, 

65 user=user, 

66 ) 

67 

68 def check_state(self, mfa): 

69 ts = self.current_timestamp() 

70 if ts - mfa.startTime >= self.lifeTime: 

71 mfa.state = gws.AuthMultiFactorState.failed 

72 return False 

73 if mfa.verifyCount > self.maxVerifyAttempts: 

74 mfa.state = gws.AuthMultiFactorState.failed 

75 return False 

76 if mfa.state == gws.AuthMultiFactorState.failed: 

77 return False 

78 return True 

79 

80 def check_restart(self, mfa): 

81 return mfa.restartCount < self.maxRestarts 

82 

83 def restart(self, mfa): 

84 rc = mfa.restartCount + 1 

85 if rc > self.maxRestarts: 

86 return 

87 

88 mfa = self.start(mfa.user) 

89 if not mfa: 

90 return 

91 

92 mfa.restartCount = rc 

93 return mfa 

94 

95 ## 

96 

97 def verify_attempt(self, mfa, payload_valid: bool): 

98 mfa.verifyCount += 1 

99 

100 if not self.check_state(mfa): 

101 return mfa 

102 

103 if payload_valid: 

104 mfa.state = gws.AuthMultiFactorState.ok 

105 return mfa 

106 

107 if mfa.verifyCount >= self.maxVerifyAttempts: 

108 mfa.state = gws.AuthMultiFactorState.failed 

109 return mfa 

110 

111 mfa.state = gws.AuthMultiFactorState.retry 

112 return mfa 

113 

114 def generate_totp(self, mfa: gws.AuthMultiFactorTransaction) -> str: 

115 ts = self.current_timestamp() 

116 totp = gws.lib.otp.new_totp(mfa.secret, ts, self.otpOptions) 

117 mfa.generateTime = ts 

118 gws.log.debug(f'generate_totp {ts=} {totp=} {mfa.generateTime=}') 

119 return totp 

120 

121 def check_totp(self, mfa: gws.AuthMultiFactorTransaction, input: str) -> bool: 

122 return gws.lib.otp.check_totp( 

123 str(input or ''), 

124 mfa.secret, 

125 self.current_timestamp(), 

126 self.otpOptions, 

127 ) 

128 

129 def current_timestamp(self): 

130 return gws.u.stime()