Coverage for gws-app/gws/base/auth/manager.py: 31%

87 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Authorization and session manager.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.config 

7import gws.lib.jsonx 

8 

9from . import session, system_provider 

10 

11 

12class Config(gws.Config): 

13 """Authentication and authorization options""" 

14 

15 methods: Optional[list[gws.ext.config.authMethod]] 

16 """authorization methods""" 

17 providers: Optional[list[gws.ext.config.authProvider]] 

18 """authorization providers""" 

19 mfa: Optional[list[gws.ext.config.authMultiFactorAdapter]] 

20 """authorization providers""" 

21 session: Optional[gws.ext.config.authSessionManager] 

22 """session options""" 

23 

24 

25_DEFAULT_SESSION_TYPE = 'sqlite' 

26 

27 

28class Object(gws.AuthManager): 

29 """Authorization manager.""" 

30 

31 def configure(self): 

32 self.sessionMgr = self.create_child(gws.ext.object.authSessionManager, self.cfg('session'), type=_DEFAULT_SESSION_TYPE) 

33 

34 self.providers = self.create_children(gws.ext.object.authProvider, self.cfg('providers')) 

35 

36 sys_provider = self.create_child(system_provider.Object) 

37 self.providers.append(sys_provider) 

38 

39 self.guestUser = sys_provider.get_user('guest') 

40 self.systemUser = sys_provider.get_user('system') 

41 

42 self.methods = self.create_children(gws.ext.object.authMethod, self.cfg('methods')) 

43 if not self.methods: 

44 # if no methods configured, enable the Web method 

45 self.methods.append(self.create_child(gws.ext.object.authMethod, type='web')) 

46 

47 self.mfAdapters = self.create_children(gws.ext.object.authMultiFactorAdapter, self.cfg('mfa')) 

48 

49 self.guestSession = session.Object(uid='guest_session', method=None, user=self.guestUser) 

50 

51 self.register_middleware('auth', depends_on=['db']) 

52 

53 ## 

54 

55 def enter_middleware(self, req): 

56 sess = self._try_open_session(req) or self.guestSession 

57 req.set_session(sess) 

58 gws.log.debug(f'session opened: user={req.session.user.uid!r} roles={req.session.user.roles}') 

59 

60 def _try_open_session(self, req): 

61 for meth in self.methods: 

62 if meth.secure and not req.isSecure: 

63 gws.log.warning(f'open_session: {meth=}: insecure_context, ignore') 

64 continue 

65 

66 sess = meth.open_session(req) 

67 if not sess: 

68 continue 

69 

70 if not sess.user: 

71 gws.log.warning(f'open_session: {meth=}: {sess.uid=} user not found') 

72 self.sessionMgr.delete(sess) 

73 return 

74 

75 if not sess.method or sess.method.uid != meth.uid: 

76 gws.log.warning(f'open_session: {meth=}: {sess.uid=} wrong method {sess.method=}') 

77 self.sessionMgr.delete(sess) 

78 return 

79 

80 gws.log.debug(f'open_session: {meth=}: ok') 

81 return sess 

82 

83 def exit_middleware(self, req, res): 

84 sess = req.session 

85 if sess.method: 

86 sess.method.close_session(req, res) 

87 req.set_session(self.guestSession) 

88 

89 ## 

90 

91 def authenticate(self, method, credentials): 

92 for prov in self.providers: 

93 if prov.allowedMethods and method.extType not in prov.allowedMethods: 

94 continue 

95 gws.log.debug(f'trying provider {prov!r}') 

96 user = prov.authenticate(method, credentials) 

97 if user: 

98 gws.log.debug(f'ok provider {prov!r}') 

99 return user 

100 

101 ## 

102 

103 def get_user(self, user_uid): 

104 provider_uid, local_uid = gws.u.split_uid(user_uid) 

105 prov = self.get_provider(provider_uid) 

106 return prov.get_user(local_uid) if prov else None 

107 

108 def get_provider(self, uid): 

109 for obj in self.providers: 

110 if obj.uid == uid: 

111 return obj 

112 

113 def get_method(self, uid=None, ext_type=None): 

114 for obj in self.methods: 

115 if obj.uid == uid: 

116 return obj 

117 

118 def get_mf_adapter(self, uid=None, ext_type=None): 

119 for obj in self.mfAdapters: 

120 if obj.uid == uid: 

121 return obj 

122 

123 def serialize_user(self, user): 

124 return gws.lib.jsonx.to_string([user.authProvider.uid, user.authProvider.serialize_user(user)]) 

125 

126 def unserialize_user(self, data): 

127 provider_uid, ds = gws.lib.jsonx.from_string(data) 

128 prov = self.get_provider(provider_uid) 

129 return prov.unserialize_user(ds) if prov else None