Coverage for gws-app/gws/base/auth/sql_provider.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Base provider for the sql-based authorization. 

2 

3SQL-based authentication works by executing SELECT queries against a SQL provider. 

4 

5The "authorization" query receives the parameters "username", "password", and/or "token" from 

6an authentication method. If the query doesn't return any rows, the next authentication 

7provider is attempted. Otherwise, exactly one row should be returned with 

8at least the following columns: 

9 

10- ``validuser`` (bool) - mandatory, should be "true" if the user is allowed to log in 

11- ``validpassword`` (bool) - mandatory, should be "true" if the password is valid 

12- ``uid`` (str) - user id 

13- ``roles``(str) - comma-separated list of roles 

14 

15Column names are case-insensitive. 

16 

17Other columns, if given, are converted to respective `gws.User` properties. 

18 

19The "getUser" query receives user ID as a parameter and should return a record for this user. 

20 

21Example configuration (assuming Postgres with ``pgcrypto``):: 

22 

23 auth.providers+ { 

24 type "sql" 

25 

26 authorizationSql ''' 

27 SELECT 

28 user.id 

29 AS uid, 

30 user.first_name || ' ' || user.last_name 

31 AS displayname, 

32 user.login 

33 AS login, 

34 user.is_enabled 

35 AS validuser, 

36 ( passwd = crypt({{password}}, passwd) ) 

37 AS validpassword 

38 FROM 

39 public.user 

40 WHERE 

41 user.login = {{username}} 

42 ''' 

43 

44 getUserSql ''' 

45 SELECT 

46 user.id 

47 AS uid, 

48 user.first_name || ' ' || user.last_name 

49 AS displayname, 

50 user.login 

51 AS login 

52 FROM 

53 public.user 

54 WHERE 

55 user.id = {{uid}} 

56 ''' 

57 } 

58 

59""" 

60 

61from typing import Optional, cast 

62 

63import re 

64 

65import gws 

66import gws.base.auth 

67import gws.base.database.provider 

68import gws.config.util 

69import gws.lib.sa as sa 

70 

71 

72class Config(gws.base.auth.provider.Config): 

73 """SQL-based authorization provider""" 

74 

75 dbUid: Optional[str] 

76 """Database provider uid""" 

77 

78 authorizationSql: str 

79 """Authorization SQL statement""" 

80 

81 getUserSql: str 

82 """User data SQL statement""" 

83 

84 

85class Placeholders(gws.Enum): 

86 username = 'username' 

87 password = 'password' 

88 token = 'token' 

89 uid = 'uid' 

90 

91 

92class Object(gws.base.auth.provider.Object): 

93 db: gws.DatabaseProvider 

94 authorizationSql: str 

95 getUserSql: str 

96 

97 def configure(self): 

98 self.configure_provider() 

99 self.authorizationSql = self.cfg('authorizationSql') 

100 self.getUserSql = self.cfg('getUserSql') 

101 

102 def configure_provider(self): 

103 return gws.config.util.configure_database_provider_for(self) 

104 

105 def authenticate(self, method, credentials): 

106 params = { 

107 Placeholders.username: credentials.get('username'), 

108 Placeholders.password: credentials.get('password'), 

109 Placeholders.token: credentials.get('token'), 

110 } 

111 

112 rs = self._get_records(self.authorizationSql, params) 

113 

114 if not rs: 

115 return 

116 if len(rs) > 1: 

117 raise gws.ForbiddenError(f'multiple records found') 

118 

119 return self._make_user(rs[0], validate=True) 

120 

121 def get_user(self, local_uid): 

122 params = { 

123 'uid': local_uid, 

124 } 

125 

126 rs = self._get_records(self.getUserSql, params) 

127 

128 if not rs: 

129 return 

130 if len(rs) > 1: 

131 return 

132 

133 return self._make_user(rs[0], validate=False) 

134 

135 def _get_records(self, sql: str, params: dict) -> list[dict]: 

136 sql = re.sub(r'{(\w+)}', r':\1', sql) 

137 with self.db.connect() as conn: 

138 return [gws.u.to_dict(r) for r in conn.execute(sa.text(sql), params)] 

139 

140 def _make_user(self, rec: dict, validate: bool) -> gws.User: 

141 user_rec = {} 

142 

143 valid_user = False 

144 valid_password = False 

145 

146 for k, v in rec.items(): 

147 lk = k.lower() 

148 

149 if lk == 'validuser': 

150 valid_user = bool(v) 

151 elif lk == 'validpassword': 

152 valid_password = bool(v) 

153 elif lk == 'uid': 

154 user_rec['localUid'] = str(v) 

155 else: 

156 user_rec[k] = v 

157 

158 if 'localUid' not in user_rec: 

159 raise gws.ForbiddenError('no uid returned') 

160 

161 if validate and not valid_user: 

162 raise gws.ForbiddenError(f'invalid user') 

163 

164 if validate and not valid_password: 

165 raise gws.ForbiddenError(f'invalid password') 

166 

167 return gws.base.auth.user.from_record(self, user_rec)