Coverage for gws-app/gws/plugin/auth_method/web/core.py: 0%

153 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Web authorisation method.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.auth 

7import gws.base.web 

8 

9gws.ext.new.authMethod('web') 

10 

11 

12class Config(gws.base.auth.method.Config): 

13 """Web-based authorization options""" 

14 

15 cookieName: str = 'auth' 

16 """name for the cookie""" 

17 cookiePath: str = '/' 

18 """cookie path""" 

19 

20 

21## 

22 

23class UserResponse(gws.Response): 

24 user: Optional[gws.base.auth.user.Props] 

25 

26 

27class LogoutResponse(gws.Response): 

28 pass 

29 

30 

31class LoginRequest(gws.Request): 

32 username: str 

33 password: str 

34 

35 

36class LoginResponse(gws.Response): 

37 user: Optional[gws.base.auth.user.Props] 

38 mfaState: Optional[gws.AuthMultiFactorState] 

39 mfaMessage: str = '' 

40 mfaCanRestart: bool = False 

41 

42 

43class MfaVerifyRequest(gws.Request): 

44 payload: dict 

45 

46 

47## 

48 

49_DELETED_SESSION = 'web:deleted' 

50 

51 

52class Object(gws.base.auth.method.Object): 

53 cookieName: str 

54 cookiePath: str 

55 

56 deletedSession: gws.base.auth.session.Object 

57 

58 def configure(self): 

59 self.uid = 'gws.plugin.self.web' 

60 self.cookieName = self.cfg('cookieName', default=Config.cookieName) 

61 self.cookiePath = self.cfg('cookiePath', default=Config.cookiePath) 

62 

63 def activate(self): 

64 am = self.root.app.authMgr 

65 self.deletedSession = gws.base.auth.session.Object( 

66 uid=_DELETED_SESSION, 

67 method=self, 

68 user=am.guestUser, 

69 ) 

70 

71 def open_session(self, req): 

72 am = self.root.app.authMgr 

73 

74 sid = req.cookie(self.cookieName) 

75 if not sid: 

76 return 

77 

78 sess = am.sessionMgr.get_valid(sid) 

79 

80 if not sess: 

81 gws.log.debug(f'open_session: {sid=} not found or invalid') 

82 return self.deletedSession 

83 

84 return sess 

85 

86 def close_session(self, req, res): 

87 am = self.root.app.authMgr 

88 

89 sess = getattr(req, 'session') 

90 if not sess: 

91 return 

92 

93 if sess.uid == _DELETED_SESSION: 

94 gws.log.debug('session cookie=deleted') 

95 res.delete_cookie( 

96 self.cookieName, 

97 path=self.cookiePath) 

98 return 

99 

100 if res.status < 400: 

101 gws.log.debug(f'session cookie={sess.uid!r}') 

102 res.set_cookie( 

103 self.cookieName, 

104 sess.uid, 

105 path=self.cookiePath, 

106 secure=self.secure, 

107 httponly=True) 

108 am.sessionMgr.save(sess) 

109 

110 def handle_login(self, req: gws.WebRequester, p: LoginRequest) -> LoginResponse: 

111 if not req.user.isGuest: 

112 raise gws.ForbiddenError(f'login: already logged-in {req.user.uid=}') 

113 

114 if self.secure and not req.isSecure: 

115 raise gws.ForbiddenError('login: insecure_context, ignored') 

116 

117 user = self.root.app.authMgr.authenticate(self, p) 

118 if not user: 

119 raise gws.ForbiddenError('login: user not found') 

120 

121 if user.mfaUid: 

122 mfa = self._mfa_start(req, user) 

123 gws.log.info(f'LOGGED_IN (MFA pending): {user.uid=} {user.roles=}') 

124 return self._mfa_response(mfa) 

125 

126 self._finalize_login(req, user) 

127 return LoginResponse(user=gws.props_of(user, user)) 

128 

129 def handle_mfa_verify(self, req: gws.WebRequester, p: MfaVerifyRequest) -> LoginResponse: 

130 try: 

131 mfa = self._mfa_verify(req, p.payload) 

132 except gws.ForbiddenError: 

133 self._delete_session(req) 

134 raise 

135 

136 if mfa.state == gws.AuthMultiFactorState.ok: 

137 self._finalize_login(req, mfa.user) 

138 return self._mfa_response(mfa) 

139 

140 if mfa.state == gws.AuthMultiFactorState.retry: 

141 return self._mfa_response(mfa) 

142 

143 self._delete_session(req) 

144 raise gws.ForbiddenError(f'MFA: verify failed {mfa.state=}') 

145 

146 def handle_mfa_restart(self, req: gws.WebRequester, p: gws.Request) -> LoginResponse: 

147 try: 

148 mfa = self._mfa_restart(req) 

149 except gws.ForbiddenError: 

150 self._delete_session(req) 

151 raise 

152 

153 return self._mfa_response(mfa) 

154 

155 def handle_logout(self, req: gws.WebRequester) -> LogoutResponse: 

156 if req.user.isGuest: 

157 self._delete_session(req) 

158 return LogoutResponse() 

159 

160 if req.session.method != self: 

161 raise gws.ForbiddenError(f'wrong method for logout: {req.session.method!r}') 

162 

163 self._delete_session(req) 

164 

165 gws.log.info(f'LOGGED_OUT: user={req.user.uid!r}') 

166 return LogoutResponse() 

167 

168 ## 

169 

170 def _delete_session(self, req: gws.WebRequester): 

171 am = self.root.app.authMgr 

172 am.sessionMgr.delete(req.session) 

173 req.set_session(self.deletedSession) 

174 

175 def _finalize_login(self, req: gws.WebRequester, user: gws.User): 

176 self._delete_session(req) 

177 am = self.root.app.authMgr 

178 req.set_session(am.sessionMgr.create(self, user)) 

179 gws.log.info(f'LOGGED_IN: {user.uid=} {user.roles=}') 

180 

181 ## 

182 

183 def _mfa_start(self, req: gws.WebRequester, user: gws.User) -> gws.AuthMultiFactorTransaction: 

184 am = self.root.app.authMgr 

185 

186 adapter = am.get_mf_adapter(user.mfaUid) 

187 if not adapter: 

188 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} unknown') 

189 

190 mfa = adapter.start(user) 

191 if not mfa: 

192 raise gws.ForbiddenError(f'MFA: {user.mfaUid=} start failed') 

193 

194 req.set_session(am.sessionMgr.create(self, am.guestUser)) 

195 

196 self._mfa_store(req, mfa) 

197 return mfa 

198 

199 def _mfa_verify(self, req: gws.WebRequester, payload: dict) -> gws.AuthMultiFactorTransaction: 

200 mfa = self._mfa_load(req) 

201 mfa = mfa.adapter.verify(mfa, payload) 

202 

203 self._mfa_store(req, mfa) 

204 return mfa 

205 

206 def _mfa_restart(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction: 

207 mfa = self._mfa_load(req) 

208 mfa = mfa.adapter.restart(mfa) 

209 if not mfa: 

210 raise gws.ForbiddenError(f'MFA: restart failed') 

211 

212 self._mfa_store(req, mfa) 

213 return mfa 

214 

215 def _mfa_store(self, req: gws.WebRequester, mfa: gws.AuthMultiFactorTransaction): 

216 am = self.root.app.authMgr 

217 

218 sess_mfa = gws.u.merge({}, mfa) 

219 sess_mfa['user'] = am.serialize_user(mfa.user) 

220 sess_mfa['adapter'] = mfa.adapter.uid 

221 req.session.set('AuthMultiFactorTransaction', sess_mfa) 

222 

223 def _mfa_load(self, req: gws.WebRequester) -> gws.AuthMultiFactorTransaction: 

224 am = self.root.app.authMgr 

225 

226 sess_mfa = req.session.get('AuthMultiFactorTransaction') 

227 if not sess_mfa: 

228 raise gws.ForbiddenError(f'MFA: transaction not found') 

229 

230 mfa = gws.AuthMultiFactorTransaction(sess_mfa) 

231 mfa.adapter = am.get_mf_adapter(sess_mfa['adapter']) 

232 mfa.user = am.unserialize_user(sess_mfa['user']) 

233 

234 if not mfa.adapter.check_state(mfa): 

235 raise gws.ForbiddenError(f'MFA: invalid transaction in session') 

236 

237 return mfa 

238 

239 def _mfa_response(self, mfa: gws.AuthMultiFactorTransaction) -> LoginResponse: 

240 return LoginResponse( 

241 mfaState=mfa.state, 

242 mfaMessage=mfa.message, 

243 mfaCanRestart=mfa.adapter.check_restart(mfa), 

244 )