Coverage for gws-app/gws/plugin/auth_method/web/core.py: 0%
153 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Web authorisation method."""
3from typing import Optional, cast
5import gws
6import gws.base.auth
7import gws.base.web
9gws.ext.new.authMethod('web')
12class Config(gws.base.auth.method.Config):
13 """Web-based authorization options"""
15 cookieName: str = 'auth'
16 """name for the cookie"""
17 cookiePath: str = '/'
18 """cookie path"""
21##
23class UserResponse(gws.Response):
24 user: Optional[gws.base.auth.user.Props]
27class LogoutResponse(gws.Response):
28 pass
31class LoginRequest(gws.Request):
32 username: str
33 password: str
36class LoginResponse(gws.Response):
37 user: Optional[gws.base.auth.user.Props]
38 mfaState: Optional[gws.AuthMultiFactorState]
39 mfaMessage: str = ''
40 mfaCanRestart: bool = False
43class MfaVerifyRequest(gws.Request):
44 payload: dict
47##
49_DELETED_SESSION = 'web:deleted'
52class Object(gws.base.auth.method.Object):
53 cookieName: str
54 cookiePath: str
56 deletedSession: gws.base.auth.session.Object
58 def configure(self):
59 self.uid = 'gws.plugin.self.web'
60 self.cookieName = self.cfg('cookieName', default=Config.cookieName)
61 self.cookiePath = self.cfg('cookiePath', default=Config.cookiePath)
63 def activate(self):
64 am = self.root.app.authMgr
65 self.deletedSession = gws.base.auth.session.Object(
66 uid=_DELETED_SESSION,
67 method=self,
68 user=am.guestUser,
69 )
71 def open_session(self, req):
72 am = self.root.app.authMgr
74 sid = req.cookie(self.cookieName)
75 if not sid:
76 return
78 sess = am.sessionMgr.get_valid(sid)
80 if not sess:
81 gws.log.debug(f'open_session: {sid=} not found or invalid')
82 return self.deletedSession
84 return sess
86 def close_session(self, req, res):
87 am = self.root.app.authMgr
89 sess = getattr(req, 'session')
90 if not sess:
91 return
93 if sess.uid == _DELETED_SESSION:
94 gws.log.debug('session cookie=deleted')
95 res.delete_cookie(
96 self.cookieName,
97 path=self.cookiePath)
98 return
100 if res.status < 400:
101 gws.log.debug(f'session cookie={sess.uid!r}')
102 res.set_cookie(
103 self.cookieName,
104 sess.uid,
105 path=self.cookiePath,
106 secure=self.secure,
107 httponly=True)
108 am.sessionMgr.save(sess)
110 def handle_login(self, req: gws.WebRequester, p: LoginRequest) -> LoginResponse:
111 if not req.user.isGuest:
112 raise gws.ForbiddenError(f'login: already logged-in {req.user.uid=}')
114 if self.secure and not req.isSecure:
115 raise gws.ForbiddenError('login: insecure_context, ignored')
117 user = self.root.app.authMgr.authenticate(self, p)
118 if not user:
119 raise gws.ForbiddenError('login: user not found')
121 if user.mfaUid:
122 mfa = self._mfa_start(req, user)
123 gws.log.info(f'LOGGED_IN (MFA pending): {user.uid=} {user.roles=}')
124 return self._mfa_response(mfa)
126 self._finalize_login(req, user)
127 return LoginResponse(user=gws.props_of(user, user))
129 def handle_mfa_verify(self, req: gws.WebRequester, p: MfaVerifyRequest) -> LoginResponse:
130 try:
131 mfa = self._mfa_verify(req, p.payload)
132 except gws.ForbiddenError:
133 self._delete_session(req)
134 raise
136 if mfa.state == gws.AuthMultiFactorState.ok:
137 self._finalize_login(req, mfa.user)
138 return self._mfa_response(mfa)
140 if mfa.state == gws.AuthMultiFactorState.retry:
141 return self._mfa_response(mfa)
143 self._delete_session(req)
144 raise gws.ForbiddenError(f'MFA: verify failed {mfa.state=}')
146 def handle_mfa_restart(self, req: gws.WebRequester, p: gws.Request) -> LoginResponse:
147 try:
148 mfa = self._mfa_restart(req)
149 except gws.ForbiddenError:
150 self._delete_session(req)
151 raise
153 return self._mfa_response(mfa)
155 def handle_logout(self, req: gws.WebRequester) -> LogoutResponse:
156 if req.user.isGuest:
157 self._delete_session(req)
158 return LogoutResponse()
160 if req.session.method != self:
161 raise gws.ForbiddenError(f'wrong method for logout: {req.session.method!r}')
163 self._delete_session(req)
165 gws.log.info(f'LOGGED_OUT: user={req.user.uid!r}')
166 return LogoutResponse()
168 ##
170 def _delete_session(self, req: gws.WebRequester):
171 am = self.root.app.authMgr
172 am.sessionMgr.delete(req.session)
173 req.set_session(self.deletedSession)
175 def _finalize_login(self, req: gws.WebRequester, user: gws.User):
176 self._delete_session(req)
177 am = self.root.app.authMgr
178 req.set_session(am.sessionMgr.create(self, user))
179 gws.log.info(f'LOGGED_IN: {user.uid=} {user.roles=}')
181 ##
183 def _mfa_start(self, req: gws.WebRequester, user: gws.User) -> gws.AuthMultiFactorTransaction:
184 am = self.root.app.authMgr
186 adapter = am.get_mf_adapter(user.mfaUid)
187 if not adapter:
188 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} unknown')
190 mfa = adapter.start(user)
191 if not mfa:
192 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} start failed')
194 req.set_session(am.sessionMgr.create(self, am.guestUser))
196 self._mfa_store(req, mfa)
197 return mfa
199 def _mfa_verify(self, req: gws.WebRequester, payload: dict) -> gws.AuthMultiFactorTransaction:
200 mfa = self._mfa_load(req)
201 mfa = mfa.adapter.verify(mfa, payload)
203 self._mfa_store(req, mfa)
204 return mfa
206 def _mfa_restart(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction:
207 mfa = self._mfa_load(req)
208 mfa = mfa.adapter.restart(mfa)
209 if not mfa:
210 raise gws.ForbiddenError(f'MFA: restart failed')
212 self._mfa_store(req, mfa)
213 return mfa
215 def _mfa_store(self, req: gws.WebRequester, mfa: gws.AuthMultiFactorTransaction):
216 am = self.root.app.authMgr
218 sess_mfa = gws.u.merge({}, mfa)
219 sess_mfa['user'] = am.serialize_user(mfa.user)
220 sess_mfa['adapter'] = mfa.adapter.uid
221 req.session.set('AuthMultiFactorTransaction', sess_mfa)
223 def _mfa_load(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction:
224 am = self.root.app.authMgr
226 sess_mfa = req.session.get('AuthMultiFactorTransaction')
227 if not sess_mfa:
228 raise gws.ForbiddenError(f'MFA: transaction not found')
230 mfa = gws.AuthMultiFactorTransaction(sess_mfa)
231 mfa.adapter = am.get_mf_adapter(sess_mfa['adapter'])
232 mfa.user = am.unserialize_user(sess_mfa['user'])
234 if not mfa.adapter.check_state(mfa):
235 raise gws.ForbiddenError(f'MFA: invalid transaction in session')
237 return mfa
239 def _mfa_response(self, mfa: gws.AuthMultiFactorTransaction) -> LoginResponse:
240 return LoginResponse(
241 mfaState=mfa.state,
242 mfaMessage=mfa.message,
243 mfaCanRestart=mfa.adapter.check_restart(mfa),
244 )