Coverage for gws-app/gws/lib/otp/__init__.py: 36%
73 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Generate HOTP and TOTP tokens.
3References:
4 https://datatracker.ietf.org/doc/html/rfc4226
5 https://datatracker.ietf.org/doc/html/rfc6238
6"""
8from typing import Optional
10import base64
11import hashlib
12import hmac
13import random
15import gws
16import gws.lib.net
19class Options(gws.Data):
20 start: int
21 step: int
22 length: int
23 tolerance: int
24 algo: str
27DEFAULTS = Options(
28 start=0,
29 step=30,
30 length=6,
31 tolerance=1,
32 algo='sha1',
33)
36def new_hotp(secret: str | bytes, counter: int, options: Optional[Options] = None) -> str:
37 """Generate a new HOTP value as per rfc4226 section 5.3."""
39 return _raw_otp(_to_bytes(secret), counter, gws.u.merge(DEFAULTS, options))
42def new_totp(secret: str | bytes, timestamp: int, options: Optional[Options] = None) -> str:
43 """Generate a new TOTP value as per rfc6238 section 4.2."""
45 options = gws.u.merge(DEFAULTS, options)
46 counter = (timestamp - options.start) // options.step
47 return _raw_otp(_to_bytes(secret), counter, options)
50def check_totp(input: str, secret: str, timestamp: int, options: Optional[Options] = None) -> bool:
51 """Check if the input TOTP is valid.
53 Compares the input against several TOTPs within the tolerance window
54 ``(timestamp-step*tolerance...timestamp+step*tolerance)``.
55 """
57 options = gws.u.merge(DEFAULTS, options)
59 if len(input) != options.length:
60 return False
62 for window in range(-options.tolerance, options.tolerance + 1):
63 ts = timestamp + options.step * window
64 counter = (ts - options.start) // options.step
65 totp = _raw_otp(_to_bytes(secret), counter, options)
66 gws.log.debug(f'check_totp {timestamp=} {totp=} {input=} {window=}')
67 if input == totp:
68 return True
70 return False
73def totp_key_uri(
74 secret: str | bytes,
75 issuer_name: str,
76 account_name: str,
77 options: Optional[Options] = None
78) -> str:
79 return _key_uri('totp', secret, issuer_name, account_name, None, options)
82def hotp_key_uri(
83 secret: str | bytes,
84 issuer_name: str,
85 account_name: str,
86 counter: Optional[int] = None,
87 options: Optional[Options] = None
88) -> str:
89 return _key_uri('totp', secret, issuer_name, account_name, counter, options)
92def _key_uri(
93 method: str,
94 secret: str | bytes,
95 issuer_name: str,
96 account_name: str,
97 counter: Optional[int] = None,
98 options: Optional[Options] = None
99) -> str:
100 """Create a key uri for auth apps.
102 Reference:
103 https://github.com/google/google-authenticator/wiki/Key-Uri-Format
104 """
106 params = {
107 'secret': base32_encode(secret),
108 'issuer': issuer_name,
109 }
111 options = gws.u.merge(DEFAULTS, options)
113 if options.algo != DEFAULTS.algo:
114 params['algorithm'] = options.algo
115 if options.length != DEFAULTS.length:
116 params['digits'] = options.length
117 if options.step != DEFAULTS.step:
118 params['period'] = options.step
119 if counter is not None:
120 params['counter'] = counter
122 return 'otpauth://{}/{}:{}?{}'.format(
123 method,
124 gws.lib.net.quote_param(issuer_name),
125 gws.lib.net.quote_param(account_name),
126 gws.lib.net.make_qs(params)
127 )
130def base32_decode(s: str) -> bytes:
131 return base64.b32decode(s)
134def base32_encode(s: str | bytes) -> str:
135 return base64.b32encode(_to_bytes(s)).decode('ascii')
138def random_secret(base32_length: int = 32) -> str:
139 """Generate a random printable secret that fits into base32_length."""
141 if (base32_length & 7) != 0:
142 raise ValueError('invalid length')
144 size = (base32_length >> 3) * 5
145 r = random.SystemRandom()
146 return ''.join(chr(r.randint(0x21, 0x7f)) for _ in range(size))
149##
151def _raw_otp(key: bytes, counter: int, options: Options) -> str:
152 # https://www.rfc-editor.org/rfc/rfc4226#section-5.3
153 #
154 # Step 1: Generate an HMAC-SHA-1 value
155 # Let HS = HMAC-SHA-1(K,C) // HS is a 20-byte string
156 #
157 # Step 2: Generate a 4-byte string (Dynamic Truncation)
158 # Let Sbits = DT(HS) // DT, defined below, returns a 31-bit string
159 #
160 # Let OffsetBits be the low-order 4 bits of String[19]
161 # Offset = StToNum(OffsetBits) // 0 <= OffSet <= 15
162 # Let P = String[OffSet]...String[OffSet+3]
163 # Return the Last 31 bits of P
164 #
165 # Let Snum = StToNum(Sbits) // Convert S to a number in 0...2^{31}-1
166 #
167 # Step 3: Compute an HOTP value
168 # Return D = Snum mod 10^Digit // D is a number in the range 0...10^{Digit}-1
170 c = counter.to_bytes(8, byteorder='big')
172 digestmod = getattr(hashlib, options.algo.lower())
173 hs = hmac.new(key, c, digestmod).digest()
175 offset = hs[-1] & 0xf
176 p = hs[offset:offset + 4]
177 snum = int.from_bytes(p, byteorder='big', signed=False) & 0x7fffffff
179 d = snum % (10 ** options.length)
181 return f'{d:0{options.length}d}'
184def _to_bytes(s):
185 return s.encode('utf8') if isinstance(s, str) else s
188def _option(options, key, default):
189 if not options:
190 return default
191 return getattr(options, key, default)