Coverage for gws-app/gws/lib/otp/__init__.py: 36%

73 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Generate HOTP and TOTP tokens. 

2 

3References: 

4 https://datatracker.ietf.org/doc/html/rfc4226 

5 https://datatracker.ietf.org/doc/html/rfc6238 

6""" 

7 

8from typing import Optional 

9 

10import base64 

11import hashlib 

12import hmac 

13import random 

14 

15import gws 

16import gws.lib.net 

17 

18 

19class Options(gws.Data): 

20 start: int 

21 step: int 

22 length: int 

23 tolerance: int 

24 algo: str 

25 

26 

27DEFAULTS = Options( 

28 start=0, 

29 step=30, 

30 length=6, 

31 tolerance=1, 

32 algo='sha1', 

33) 

34 

35 

36def new_hotp(secret: str | bytes, counter: int, options: Optional[Options] = None) -> str: 

37 """Generate a new HOTP value as per rfc4226 section 5.3.""" 

38 

39 return _raw_otp(_to_bytes(secret), counter, gws.u.merge(DEFAULTS, options)) 

40 

41 

42def new_totp(secret: str | bytes, timestamp: int, options: Optional[Options] = None) -> str: 

43 """Generate a new TOTP value as per rfc6238 section 4.2.""" 

44 

45 options = gws.u.merge(DEFAULTS, options) 

46 counter = (timestamp - options.start) // options.step 

47 return _raw_otp(_to_bytes(secret), counter, options) 

48 

49 

50def check_totp(input: str, secret: str, timestamp: int, options: Optional[Options] = None) -> bool: 

51 """Check if the input TOTP is valid. 

52 

53 Compares the input against several TOTPs within the tolerance window 

54 ``(timestamp-step*tolerance...timestamp+step*tolerance)``. 

55 """ 

56 

57 options = gws.u.merge(DEFAULTS, options) 

58 

59 if len(input) != options.length: 

60 return False 

61 

62 for window in range(-options.tolerance, options.tolerance + 1): 

63 ts = timestamp + options.step * window 

64 counter = (ts - options.start) // options.step 

65 totp = _raw_otp(_to_bytes(secret), counter, options) 

66 gws.log.debug(f'check_totp {timestamp=} {totp=} {input=} {window=}') 

67 if input == totp: 

68 return True 

69 

70 return False 

71 

72 

73def totp_key_uri( 

74 secret: str | bytes, 

75 issuer_name: str, 

76 account_name: str, 

77 options: Optional[Options] = None 

78) -> str: 

79 return _key_uri('totp', secret, issuer_name, account_name, None, options) 

80 

81 

82def hotp_key_uri( 

83 secret: str | bytes, 

84 issuer_name: str, 

85 account_name: str, 

86 counter: Optional[int] = None, 

87 options: Optional[Options] = None 

88) -> str: 

89 return _key_uri('totp', secret, issuer_name, account_name, counter, options) 

90 

91 

92def _key_uri( 

93 method: str, 

94 secret: str | bytes, 

95 issuer_name: str, 

96 account_name: str, 

97 counter: Optional[int] = None, 

98 options: Optional[Options] = None 

99) -> str: 

100 """Create a key uri for auth apps. 

101 

102 Reference: 

103 https://github.com/google/google-authenticator/wiki/Key-Uri-Format 

104 """ 

105 

106 params = { 

107 'secret': base32_encode(secret), 

108 'issuer': issuer_name, 

109 } 

110 

111 options = gws.u.merge(DEFAULTS, options) 

112 

113 if options.algo != DEFAULTS.algo: 

114 params['algorithm'] = options.algo 

115 if options.length != DEFAULTS.length: 

116 params['digits'] = options.length 

117 if options.step != DEFAULTS.step: 

118 params['period'] = options.step 

119 if counter is not None: 

120 params['counter'] = counter 

121 

122 return 'otpauth://{}/{}:{}?{}'.format( 

123 method, 

124 gws.lib.net.quote_param(issuer_name), 

125 gws.lib.net.quote_param(account_name), 

126 gws.lib.net.make_qs(params) 

127 ) 

128 

129 

130def base32_decode(s: str) -> bytes: 

131 return base64.b32decode(s) 

132 

133 

134def base32_encode(s: str | bytes) -> str: 

135 return base64.b32encode(_to_bytes(s)).decode('ascii') 

136 

137 

138def random_secret(base32_length: int = 32) -> str: 

139 """Generate a random printable secret that fits into base32_length.""" 

140 

141 if (base32_length & 7) != 0: 

142 raise ValueError('invalid length') 

143 

144 size = (base32_length >> 3) * 5 

145 r = random.SystemRandom() 

146 return ''.join(chr(r.randint(0x21, 0x7f)) for _ in range(size)) 

147 

148 

149## 

150 

151def _raw_otp(key: bytes, counter: int, options: Options) -> str: 

152 # https://www.rfc-editor.org/rfc/rfc4226#section-5.3 

153 # 

154 # Step 1: Generate an HMAC-SHA-1 value 

155 # Let HS = HMAC-SHA-1(K,C) // HS is a 20-byte string 

156 # 

157 # Step 2: Generate a 4-byte string (Dynamic Truncation) 

158 # Let Sbits = DT(HS) // DT, defined below, returns a 31-bit string 

159 # 

160 # Let OffsetBits be the low-order 4 bits of String[19] 

161 # Offset = StToNum(OffsetBits) // 0 <= OffSet <= 15 

162 # Let P = String[OffSet]...String[OffSet+3] 

163 # Return the Last 31 bits of P 

164 # 

165 # Let Snum = StToNum(Sbits) // Convert S to a number in 0...2^{31}-1 

166 # 

167 # Step 3: Compute an HOTP value 

168 # Return D = Snum mod 10^Digit // D is a number in the range 0...10^{Digit}-1 

169 

170 c = counter.to_bytes(8, byteorder='big') 

171 

172 digestmod = getattr(hashlib, options.algo.lower()) 

173 hs = hmac.new(key, c, digestmod).digest() 

174 

175 offset = hs[-1] & 0xf 

176 p = hs[offset:offset + 4] 

177 snum = int.from_bytes(p, byteorder='big', signed=False) & 0x7fffffff 

178 

179 d = snum % (10 ** options.length) 

180 

181 return f'{d:0{options.length}d}' 

182 

183 

184def _to_bytes(s): 

185 return s.encode('utf8') if isinstance(s, str) else s 

186 

187 

188def _option(options, key, default): 

189 if not options: 

190 return default 

191 return getattr(options, key, default)