Coverage for gws-app/gws/base/auth/sql_provider.py: 0%
69 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Base provider for the sql-based authorization.
3SQL-based authentication works by executing SELECT queries against a SQL provider.
5The "authorization" query receives the parameters "username", "password", and/or "token" from
6an authentication method. If the query doesn't return any rows, the next authentication
7provider is attempted. Otherwise, exactly one row should be returned with
8at least the following columns:
10- ``validuser`` (bool) - mandatory, should be "true" if the user is allowed to log in
11- ``validpassword`` (bool) - mandatory, should be "true" if the password is valid
12- ``uid`` (str) - user id
13- ``roles``(str) - comma-separated list of roles
15Column names are case-insensitive.
17Other columns, if given, are converted to respective `gws.User` properties.
19The "getUser" query receives user ID as a parameter and should return a record for this user.
21Example configuration (assuming Postgres with ``pgcrypto``)::
23 auth.providers+ {
24 type "sql"
26 authorizationSql '''
27 SELECT
28 user.id
29 AS uid,
30 user.first_name || ' ' || user.last_name
31 AS displayname,
32 user.login
33 AS login,
34 user.is_enabled
35 AS validuser,
36 ( passwd = crypt({{password}}, passwd) )
37 AS validpassword
38 FROM
39 public.user
40 WHERE
41 user.login = {{username}}
42 '''
44 getUserSql '''
45 SELECT
46 user.id
47 AS uid,
48 user.first_name || ' ' || user.last_name
49 AS displayname,
50 user.login
51 AS login
52 FROM
53 public.user
54 WHERE
55 user.id = {{uid}}
56 '''
57 }
59"""
61from typing import Optional, cast
63import re
65import gws
66import gws.base.auth
67import gws.base.database.provider
68import gws.config.util
69import gws.lib.sa as sa
72class Config(gws.base.auth.provider.Config):
73 """SQL-based authorization provider"""
75 dbUid: Optional[str]
76 """Database provider uid"""
78 authorizationSql: str
79 """Authorization SQL statement"""
81 getUserSql: str
82 """User data SQL statement"""
85class Placeholders(gws.Enum):
86 username = 'username'
87 password = 'password'
88 token = 'token'
89 uid = 'uid'
92class Object(gws.base.auth.provider.Object):
93 db: gws.DatabaseProvider
94 authorizationSql: str
95 getUserSql: str
97 def configure(self):
98 self.configure_provider()
99 self.authorizationSql = self.cfg('authorizationSql')
100 self.getUserSql = self.cfg('getUserSql')
102 def configure_provider(self):
103 return gws.config.util.configure_database_provider_for(self)
105 def authenticate(self, method, credentials):
106 params = {
107 Placeholders.username: credentials.get('username'),
108 Placeholders.password: credentials.get('password'),
109 Placeholders.token: credentials.get('token'),
110 }
112 rs = self._get_records(self.authorizationSql, params)
114 if not rs:
115 return
116 if len(rs) > 1:
117 raise gws.ForbiddenError(f'multiple records found')
119 return self._make_user(rs[0], validate=True)
121 def get_user(self, local_uid):
122 params = {
123 'uid': local_uid,
124 }
126 rs = self._get_records(self.getUserSql, params)
128 if not rs:
129 return
130 if len(rs) > 1:
131 return
133 return self._make_user(rs[0], validate=False)
135 def _get_records(self, sql: str, params: dict) -> list[dict]:
136 sql = re.sub(r'{(\w+)}', r':\1', sql)
137 with self.db.connect() as conn:
138 return [gws.u.to_dict(r) for r in conn.execute(sa.text(sql), params)]
140 def _make_user(self, rec: dict, validate: bool) -> gws.User:
141 user_rec = {}
143 valid_user = False
144 valid_password = False
146 for k, v in rec.items():
147 lk = k.lower()
149 if lk == 'validuser':
150 valid_user = bool(v)
151 elif lk == 'validpassword':
152 valid_password = bool(v)
153 elif lk == 'uid':
154 user_rec['localUid'] = str(v)
155 else:
156 user_rec[k] = v
158 if 'localUid' not in user_rec:
159 raise gws.ForbiddenError('no uid returned')
161 if validate and not valid_user:
162 raise gws.ForbiddenError(f'invalid user')
164 if validate and not valid_password:
165 raise gws.ForbiddenError(f'invalid password')
167 return gws.base.auth.user.from_record(self, user_rec)