Coverage for gws-app/gws/base/web/wsgi.py: 23%

194 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Any 

2 

3import gzip 

4import io 

5import os 

6import werkzeug.utils 

7import werkzeug.wrappers 

8import werkzeug.wsgi 

9 

10import gws 

11import gws.lib.jsonx 

12import gws.lib.mime 

13import gws.lib.vendor.umsgpack as umsgpack 

14 

15from . import error 

16 

17 

18class Responder(gws.WebResponder): 

19 def __init__(self, **kwargs): 

20 if 'wz' in kwargs: 

21 self._wz = kwargs['wz'] 

22 else: 

23 self._wz = werkzeug.wrappers.Response(**kwargs) 

24 self.response = kwargs.get('response') 

25 self.status = self._wz.status_code 

26 

27 def __repr__(self): 

28 return f'<Responder {self._wz}>' 

29 

30 def send_response(self, environ, start_response): 

31 return self._wz(environ, start_response) 

32 

33 def set_cookie(self, key, value, **kwargs): 

34 self._wz.set_cookie(key, value, **kwargs) 

35 

36 def delete_cookie(self, key, **kwargs): 

37 self._wz.delete_cookie(key, **kwargs) 

38 

39 def add_header(self, key, value): 

40 self._wz.headers.add(key, value) 

41 

42 def set_status(self, status): 

43 self._wz.status_code = int(status) 

44 

45 

46class Requester(gws.WebRequester): 

47 _struct_mime = { 

48 'json': 'application/json', 

49 'msgpack': 'application/msgpack', 

50 } 

51 

52 def __init__(self, root: gws.Root, environ: dict, site: gws.WebSite, **kwargs): 

53 if 'wz' in kwargs: 

54 self._wz = kwargs['wz'] 

55 else: 

56 self._wz = werkzeug.wrappers.Request(environ) 

57 

58 # this is also set in nginx (see server/ini), but we need this for unzipping (see data() below) 

59 self._wz.max_content_length = int(root.app.cfg('server.web.maxRequestLength', default=1)) * 1024 * 1024 

60 

61 self.root = root 

62 self.site = site 

63 

64 self.environ = self._wz.environ 

65 self.method = self._wz.method.upper() 

66 self.isSecure = self._wz.is_secure 

67 

68 self.session = root.app.authMgr.guestSession 

69 self.user = root.app.authMgr.guestUser 

70 

71 self.isPost = self.method == 'POST' 

72 self.isGet = self.method == 'GET' 

73 

74 self.inputType = None 

75 if self.isPost: 

76 self.inputType = self._struct_type(self.header('content-type')) 

77 

78 self.outputType = None 

79 if self.inputType: 

80 self.outputType = self._struct_type(self.header('accept')) or self.inputType 

81 

82 self.isApi = self.inputType is not None 

83 

84 self._parsed_params = {} 

85 self._parsed_params_lc = {} 

86 self._parsed_struct = {} 

87 self._parsed_command = '' 

88 self._parsed = False 

89 

90 def __repr__(self): 

91 return f'<Requester {self._wz}>' 

92 

93 def params(self): 

94 self._parse() 

95 return self._parsed_params 

96 

97 def struct(self): 

98 self._parse() 

99 return self._parsed_struct 

100 

101 def command(self): 

102 self._parse() 

103 return self._parsed_command 

104 

105 def data(self): 

106 if not self.isPost: 

107 return None 

108 

109 data = self._wz.get_data(as_text=False, parse_form_data=False) 

110 

111 if self.root.app.developer_option('request.log_all'): 

112 gws.u.write_file_b(gws.u.ensure_dir(f'{gws.c.VAR_DIR}/debug') + '/request_{gws.u.mstime()}', data) 

113 

114 if self.header('content-encoding') == 'gzip': 

115 with gzip.GzipFile(fileobj=io.BytesIO(data)) as fp: 

116 return fp.read(self._wz.max_content_length) 

117 

118 return data 

119 

120 def text(self): 

121 data = self.data() 

122 if data is None: 

123 return None 

124 

125 charset = self.header('charset', 'utf-8') 

126 try: 

127 return data.decode(encoding=charset, errors='strict') 

128 except UnicodeDecodeError as exc: 

129 gws.log.error('post data decoding error') 

130 raise error.BadRequest() from exc 

131 

132 def env(self, key, default=''): 

133 return self._wz.environ.get(key, default) 

134 

135 def has_param(self, key): 

136 self._parse() 

137 return key.lower() in self._parsed_params_lc 

138 

139 def param(self, key, default=''): 

140 return self._parsed_params_lc.get(key.lower(), default) 

141 

142 def header(self, key, default=''): 

143 return self._wz.headers.get(key, default) 

144 

145 def cookie(self, key, default=''): 

146 return self._wz.cookies.get(key, default) 

147 

148 def content_responder(self, response): 

149 args: dict = { 

150 'mimetype': response.mime, 

151 'status': response.status or 200, 

152 'headers': {}, 

153 'direct_passthrough': False, 

154 } 

155 

156 attach_name = None 

157 

158 if response.attachmentName: 

159 attach_name = response.attachmentName 

160 elif response.asAttachment: 

161 if response.contentPath: 

162 attach_name = os.path.basename(response.contentPath) 

163 elif response.mime: 

164 ext = gws.lib.mime.extension_for(response.mime) 

165 attach_name = 'download.' + ext 

166 else: 

167 attach_name = 'download' 

168 

169 if attach_name: 

170 args['headers']['Content-Disposition'] = f'attachment; filename="{attach_name}"' 

171 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(attach_name) 

172 

173 if response.contentPath: 

174 args['response'] = werkzeug.wsgi.wrap_file(self.environ, open(response.contentPath, 'rb')) 

175 args['headers']['Content-Length'] = str(os.path.getsize(response.contentPath)) 

176 args['mimetype'] = args['mimetype'] or gws.lib.mime.for_path(response.contentPath) 

177 args['direct_passthrough'] = True 

178 else: 

179 args['response'] = response.content 

180 

181 if response.headers: 

182 args['headers'].update(response.headers) 

183 

184 return Responder(**args) 

185 

186 def redirect_responder(self, response): 

187 wz = werkzeug.utils.redirect(response.location, response.status or 302) 

188 if response.headers: 

189 wz.headers.update(response.headers) 

190 return Responder(wz=wz) 

191 

192 def api_responder(self, response): 

193 typ = self.outputType or 'json' 

194 return Responder( 

195 response=self._encode_struct(response, typ), 

196 mimetype=self._struct_mime[typ], 

197 status=response.status or 200, 

198 ) 

199 

200 def error_responder(self, exc): 

201 err = exc if isinstance(exc, error.HTTPException) else error.InternalServerError() 

202 return Responder(wz=err.get_response(self._wz.environ)) 

203 

204 ## 

205 

206 def url_for(self, path, **kwargs): 

207 return self.site.url_for(self, path, **kwargs) 

208 

209 ## 

210 

211 def set_session(self, sess): 

212 self.session = sess 

213 self.user = sess.user 

214 

215 ## 

216 

217 _cmd_param_name = 'cmd' 

218 

219 def _parse(self): 

220 if self._parsed: 

221 return 

222 

223 # the server only understands requests to /_ or /_/commandName 

224 # GET params can be given as query string or encoded in the path 

225 # like _/commandName/param1/value1/param2/value2 etc 

226 

227 path = self._wz.path 

228 path_parts = None 

229 

230 if path == gws.c.SERVER_ENDPOINT: 

231 # example.com/_ 

232 # the cmd param is expected to be in the query string or json 

233 cmd = '' 

234 elif path.startswith(gws.c.SERVER_ENDPOINT + '/'): 

235 # example.com/_/someCommand 

236 # the cmd param is in the url 

237 path_parts = path.split('/') 

238 cmd = path_parts[2] 

239 path_parts = path_parts[3:] 

240 else: 

241 raise error.NotFound(f'invalid request path: {path!r}') 

242 

243 if self.inputType: 

244 self._parsed_struct = self._decode_struct(self.inputType) 

245 self._parsed_command = cmd or self._parsed_struct.pop(self._cmd_param_name, '') 

246 else: 

247 d = dict(self._wz.args) 

248 if path_parts: 

249 for n in range(1, len(path_parts), 2): 

250 d[path_parts[n - 1]] = path_parts[n] 

251 self._parsed_command = cmd or d.pop(self._cmd_param_name, '') 

252 self._parsed_params = d 

253 self._parsed_params_lc = {k.lower(): v for k, v in d.items()} 

254 

255 self._parsed = True 

256 

257 def _struct_type(self, header): 

258 if header: 

259 header = header.lower() 

260 if header.startswith(self._struct_mime['json']): 

261 return 'json' 

262 if header.startswith(self._struct_mime['msgpack']): 

263 return 'msgpack' 

264 

265 def _encode_struct(self, data, typ): 

266 if typ == 'json': 

267 return gws.lib.jsonx.to_string(data, pretty=True) 

268 if typ == 'msgpack': 

269 return umsgpack.dumps(data, default=gws.u.to_dict) 

270 raise ValueError('invalid struct type') 

271 

272 def _decode_struct(self, typ): 

273 if typ == 'json': 

274 try: 

275 s = self.data().decode(encoding='utf-8', errors='strict') 

276 return gws.lib.jsonx.from_string(s) 

277 except (UnicodeDecodeError, gws.lib.jsonx.Error): 

278 gws.log.error('malformed json request') 

279 raise error.BadRequest() 

280 

281 if typ == 'msgpack': 

282 try: 

283 return umsgpack.loads(self.data()) 

284 except (TypeError, umsgpack.UnpackException): 

285 gws.log.error('malformed msgpack request') 

286 raise error.BadRequest() 

287 

288 gws.log.error('invalid struct type') 

289 raise error.BadRequest()