Coverage for gws-app/gws/lib/net/__init__.py: 35%

208 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional 

2 

3import cgi 

4import re 

5import requests 

6import requests.structures 

7import urllib.parse 

8import certifi 

9 

10import gws 

11import gws.lib.osx 

12 

13 

14## 

15 

16class Error(gws.Error): 

17 pass 

18 

19 

20class HTTPError(Error): 

21 pass 

22 

23 

24class Timeout(Error): 

25 pass 

26 

27 

28## 

29 

30class Url(gws.Data): 

31 fragment: str 

32 hostname: str 

33 netloc: str 

34 params: dict 

35 password: str 

36 path: str 

37 pathparts: dict 

38 port: str 

39 qsl: list 

40 query: str 

41 scheme: str 

42 url: str 

43 username: str 

44 

45 

46def parse_url(url: str, **kwargs) -> Url: 

47 """Parse a string url and return an Url object""" 

48 

49 if not is_abs_url(url): 

50 url = '//' + url 

51 

52 us = urllib.parse.urlsplit(url) 

53 u = Url( 

54 fragment=us.fragment or '', 

55 hostname=us.hostname or '', 

56 netloc=us.netloc or '', 

57 params={}, 

58 password=us.password or '', 

59 path=us.path or '', 

60 pathparts={}, 

61 port=str(us.port or ''), 

62 qsl=[], 

63 query=us.query or '', 

64 scheme=us.scheme or '', 

65 url=url, 

66 username=us.username or '', 

67 ) 

68 

69 if u.path: 

70 u.pathparts = gws.lib.osx.parse_path(u.path) 

71 

72 if u.query: 

73 u.qsl = urllib.parse.parse_qsl(u.query) 

74 for k, v in u.qsl: 

75 u.params.setdefault(k.lower(), v) 

76 

77 if u.username: 

78 u.username = unquote(u.username) 

79 u.password = unquote(u.get('password', '')) 

80 

81 u.update(**kwargs) 

82 return u 

83 

84 

85def make_url(u: Optional[Url | dict] = None, **kwargs) -> str: 

86 p = gws.u.merge({}, u, kwargs) 

87 

88 s = '' 

89 

90 if p.get('scheme'): 

91 s += p['scheme'] + ':' 

92 

93 s += '//' 

94 

95 if p.get('username'): 

96 s += quote_param(p['username']) + ':' + quote_param(p.get('password', '')) + '@' 

97 

98 if p.get('hostname'): 

99 s += p['hostname'] 

100 if p.get('port'): 

101 s += ':' + str(p['port']) 

102 if p.get('path'): 

103 s += '/' 

104 else: 

105 s += '/' 

106 

107 if p.get('path'): 

108 s += quote_path(p['path'].lstrip('/')) 

109 

110 if p.get('params'): 

111 s += '?' + make_qs(p['params']) 

112 

113 if p.get('fragment'): 

114 s += '#' + p['fragment'].lstrip('#') 

115 

116 return s 

117 

118 

119def parse_qs(x) -> dict: 

120 return urllib.parse.parse_qs(x) 

121 

122 

123def make_qs(x) -> str: 

124 """Convert a dict/list to a query string. 

125 

126 For each item in x, if it's a list, join it with a comma, stringify and in utf8. 

127 

128 Args: 

129 x: Value, which can be a dict'able or a list of key,value pairs. 

130 

131 Returns: 

132 The query string. 

133 """ 

134 

135 p = [] 

136 items = x if isinstance(x, (list, tuple)) else gws.u.to_dict(x).items() 

137 

138 def _value(v): 

139 if isinstance(v, (bytes, bytearray)): 

140 return v 

141 if isinstance(v, str): 

142 return v.encode('utf8') 

143 if v is True: 

144 return b'true' 

145 if v is False: 

146 return b'false' 

147 try: 

148 return b','.join(_value(y) for y in v) 

149 except TypeError: 

150 return str(v).encode('utf8') 

151 

152 for k, v in items: 

153 k = urllib.parse.quote_from_bytes(_value(k)) 

154 v = urllib.parse.quote_from_bytes(_value(v)) 

155 p.append(k + '=' + v) 

156 

157 return '&'.join(p) 

158 

159 

160def quote_param(s: str) -> str: 

161 return urllib.parse.quote(s, safe='') 

162 

163 

164def quote_path(s: str) -> str: 

165 return urllib.parse.quote(s, safe='/') 

166 

167 

168def unquote(s: str) -> str: 

169 return urllib.parse.unquote(s) 

170 

171 

172def add_params(url: str, params: dict = None, **kwargs) -> str: 

173 u = parse_url(url) 

174 if params: 

175 u.params.update(params) 

176 u.params.update(kwargs) 

177 return make_url(u) 

178 

179 

180def extract_params(url: str) -> tuple[str, dict]: 

181 u = parse_url(url) 

182 params = u.params 

183 u.params = None 

184 return make_url(u), params 

185 

186 

187def is_abs_url(url): 

188 return re.match(r'^([a-z]+:|)//', url) 

189 

190 

191## 

192 

193 

194class HTTPResponse: 

195 def __init__(self, ok: bool, url: str, res: requests.Response = None, text: str = None, status_code=0): 

196 self.ok = ok 

197 self.url = url 

198 if res is not None: 

199 self.content_type, self.content_encoding = _parse_content_type(res.headers) 

200 self.content = res.content 

201 self.status_code = res.status_code 

202 else: 

203 self.content_type, self.content_encoding = 'text/plain', 'utf8' 

204 self.content = text.encode('utf8') if text is not None else b'' 

205 self.status_code = status_code 

206 

207 @property 

208 def text(self) -> str: 

209 if not hasattr(self, '_text'): 

210 setattr(self, '_text', _get_text(self.content, self.content_encoding)) 

211 return getattr(self, '_text') 

212 

213 def raise_if_failed(self): 

214 if not self.ok: 

215 raise HTTPError(self.status_code, self.text) 

216 

217 

218def _get_text(content, encoding) -> str: 

219 if encoding: 

220 try: 

221 return str(content, encoding=encoding, errors='strict') 

222 except UnicodeDecodeError: 

223 pass 

224 

225 # some folks serve utf8 content without a header, in which case requests thinks it's ISO-8859-1 

226 # (see http://docs.python-requests.org/en/master/user/advanced/#encodings) 

227 # 

228 # 'apparent_encoding' is not always reliable 

229 # 

230 # therefore when there's no header, we try utf8 first, and then ISO-8859-1 

231 

232 try: 

233 return str(content, encoding='utf8', errors='strict') 

234 except UnicodeDecodeError: 

235 pass 

236 

237 try: 

238 return str(content, encoding='ISO-8859-1', errors='strict') 

239 except UnicodeDecodeError: 

240 pass 

241 

242 # both failed, do utf8 with replace 

243 

244 gws.log.warning(f'decode failed') 

245 return str(content, encoding='utf8', errors='replace') 

246 

247 

248def _parse_content_type(headers): 

249 # copied from requests.utils.get_encoding_from_headers, but with no ISO-8859-1 default 

250 

251 content_type = headers.get('content-type') 

252 

253 if not content_type: 

254 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1 

255 return 'application/octet-stream', None 

256 

257 ctype, params = cgi.parse_header(content_type) 

258 if 'charset' not in params: 

259 return ctype, None 

260 

261 enc = params['charset'].strip("'\"") 

262 

263 # make sure this is a valid python encoding 

264 try: 

265 str(b'.', encoding=enc, errors='strict') 

266 except LookupError: 

267 gws.log.warning(f'invalid content-type encoding {enc!r}') 

268 return ctype, None 

269 

270 return ctype, enc 

271 

272 

273## 

274 

275# @TODO locking for caches 

276 

277 

278def http_request(url, **kwargs) -> HTTPResponse: 

279 kwargs = dict(kwargs) 

280 

281 if 'params' in kwargs: 

282 url = add_params(url, kwargs.pop('params')) 

283 

284 method = kwargs.pop('method', 'GET').upper() 

285 max_age = kwargs.pop('max_age', 0) 

286 cache_path = _cache_path(url) 

287 

288 if method == 'GET' and max_age: 

289 age = gws.lib.osx.file_age(cache_path) 

290 if 0 <= age < max_age: 

291 gws.log.debug(f'HTTP_CACHED_{method}: url={url!r} path={cache_path!r} age={age}') 

292 return gws.u.unserialize_from_path(cache_path) 

293 

294 gws.debug.time_start(f'HTTP_{method}={url!r}') 

295 res = _http_request(method, url, kwargs) 

296 gws.debug.time_end() 

297 

298 if method == 'GET' and max_age and res.ok: 

299 gws.u.serialize_to_path(res, cache_path) 

300 

301 return res 

302 

303 

304_DEFAULT_CONNECT_TIMEOUT = 60 

305_DEFAULT_READ_TIMEOUT = 60 

306 

307_USER_AGENT = f'GBD WebSuite (https://gbd-websuite.de)' 

308 

309 

310def _http_request(method, url, kwargs) -> HTTPResponse: 

311 kwargs['stream'] = False 

312 

313 if 'verify' not in kwargs: 

314 kwargs['verify'] = certifi.where() 

315 

316 timeout = kwargs.get('timeout', (_DEFAULT_CONNECT_TIMEOUT, _DEFAULT_READ_TIMEOUT)) 

317 if isinstance(timeout, (int, float)): 

318 timeout = int(timeout), int(timeout) 

319 kwargs['timeout'] = timeout 

320 

321 if 'headers' not in kwargs: 

322 kwargs['headers'] = {} 

323 kwargs['headers'].setdefault('User-Agent', _USER_AGENT) 

324 

325 try: 

326 res = requests.request(method, url, **kwargs) 

327 if 200 <= res.status_code < 300: 

328 gws.log.debug(f'HTTP_OK_{method}: url={url!r} status={res.status_code!r}') 

329 return HTTPResponse(ok=True, url=url, res=res) 

330 gws.log.error(f'HTTP_FAILED_{method}: ({res.status_code!r}) url={url!r}') 

331 return HTTPResponse(ok=False, url=url, res=res) 

332 except requests.ConnectionError as exc: 

333 gws.log.error(f'HTTP_FAILED_{method}: (ConnectionError) url={url!r}') 

334 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=900) 

335 except requests.Timeout as exc: 

336 gws.log.error(f'HTTP_FAILED_{method}: (Timeout) url={url!r}') 

337 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=901) 

338 except requests.RequestException as exc: 

339 gws.log.error(f'HTTP_FAILED_{method}: (Generic: {exc!r}) url={url!r}') 

340 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=999) 

341 

342 

343def _cache_path(url): 

344 return gws.c.NET_CACHE_DIR + '/' + gws.u.sha256(url)