Coverage for gws-app/gws/base/auth/manager.py: 31%
87 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Authorization and session manager."""
3from typing import Optional, cast
5import gws
6import gws.config
7import gws.lib.jsonx
9from . import session, system_provider
12class Config(gws.Config):
13 """Authentication and authorization options"""
15 methods: Optional[list[gws.ext.config.authMethod]]
16 """authorization methods"""
17 providers: Optional[list[gws.ext.config.authProvider]]
18 """authorization providers"""
19 mfa: Optional[list[gws.ext.config.authMultiFactorAdapter]]
20 """authorization providers"""
21 session: Optional[gws.ext.config.authSessionManager]
22 """session options"""
25_DEFAULT_SESSION_TYPE = 'sqlite'
28class Object(gws.AuthManager):
29 """Authorization manager."""
31 def configure(self):
32 self.sessionMgr = self.create_child(gws.ext.object.authSessionManager, self.cfg('session'), type=_DEFAULT_SESSION_TYPE)
34 self.providers = self.create_children(gws.ext.object.authProvider, self.cfg('providers'))
36 sys_provider = self.create_child(system_provider.Object)
37 self.providers.append(sys_provider)
39 self.guestUser = sys_provider.get_user('guest')
40 self.systemUser = sys_provider.get_user('system')
42 self.methods = self.create_children(gws.ext.object.authMethod, self.cfg('methods'))
43 if not self.methods:
44 # if no methods configured, enable the Web method
45 self.methods.append(self.create_child(gws.ext.object.authMethod, type='web'))
47 self.mfAdapters = self.create_children(gws.ext.object.authMultiFactorAdapter, self.cfg('mfa'))
49 self.guestSession = session.Object(uid='guest_session', method=None, user=self.guestUser)
51 self.register_middleware('auth', depends_on=['db'])
53 ##
55 def enter_middleware(self, req):
56 sess = self._try_open_session(req) or self.guestSession
57 req.set_session(sess)
58 gws.log.debug(f'session opened: user={req.session.user.uid!r} roles={req.session.user.roles}')
60 def _try_open_session(self, req):
61 for meth in self.methods:
62 if meth.secure and not req.isSecure:
63 gws.log.warning(f'open_session: {meth=}: insecure_context, ignore')
64 continue
66 sess = meth.open_session(req)
67 if not sess:
68 continue
70 if not sess.user:
71 gws.log.warning(f'open_session: {meth=}: {sess.uid=} user not found')
72 self.sessionMgr.delete(sess)
73 return
75 if not sess.method or sess.method.uid != meth.uid:
76 gws.log.warning(f'open_session: {meth=}: {sess.uid=} wrong method {sess.method=}')
77 self.sessionMgr.delete(sess)
78 return
80 gws.log.debug(f'open_session: {meth=}: ok')
81 return sess
83 def exit_middleware(self, req, res):
84 sess = req.session
85 if sess.method:
86 sess.method.close_session(req, res)
87 req.set_session(self.guestSession)
89 ##
91 def authenticate(self, method, credentials):
92 for prov in self.providers:
93 if prov.allowedMethods and method.extType not in prov.allowedMethods:
94 continue
95 gws.log.debug(f'trying provider {prov!r}')
96 user = prov.authenticate(method, credentials)
97 if user:
98 gws.log.debug(f'ok provider {prov!r}')
99 return user
101 ##
103 def get_user(self, user_uid):
104 provider_uid, local_uid = gws.u.split_uid(user_uid)
105 prov = self.get_provider(provider_uid)
106 return prov.get_user(local_uid) if prov else None
108 def get_provider(self, uid):
109 for obj in self.providers:
110 if obj.uid == uid:
111 return obj
113 def get_method(self, uid=None, ext_type=None):
114 for obj in self.methods:
115 if obj.uid == uid:
116 return obj
118 def get_mf_adapter(self, uid=None, ext_type=None):
119 for obj in self.mfAdapters:
120 if obj.uid == uid:
121 return obj
123 def serialize_user(self, user):
124 return gws.lib.jsonx.to_string([user.authProvider.uid, user.authProvider.serialize_user(user)])
126 def unserialize_user(self, data):
127 provider_uid, ds = gws.lib.jsonx.from_string(data)
128 prov = self.get_provider(provider_uid)
129 return prov.unserialize_user(ds) if prov else None