Coverage for gws-app/gws/lib/net/__init__.py: 35%
208 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional
3import cgi
4import re
5import requests
6import requests.structures
7import urllib.parse
8import certifi
10import gws
11import gws.lib.osx
14##
16class Error(gws.Error):
17 pass
20class HTTPError(Error):
21 pass
24class Timeout(Error):
25 pass
28##
30class Url(gws.Data):
31 fragment: str
32 hostname: str
33 netloc: str
34 params: dict
35 password: str
36 path: str
37 pathparts: dict
38 port: str
39 qsl: list
40 query: str
41 scheme: str
42 url: str
43 username: str
46def parse_url(url: str, **kwargs) -> Url:
47 """Parse a string url and return an Url object"""
49 if not is_abs_url(url):
50 url = '//' + url
52 us = urllib.parse.urlsplit(url)
53 u = Url(
54 fragment=us.fragment or '',
55 hostname=us.hostname or '',
56 netloc=us.netloc or '',
57 params={},
58 password=us.password or '',
59 path=us.path or '',
60 pathparts={},
61 port=str(us.port or ''),
62 qsl=[],
63 query=us.query or '',
64 scheme=us.scheme or '',
65 url=url,
66 username=us.username or '',
67 )
69 if u.path:
70 u.pathparts = gws.lib.osx.parse_path(u.path)
72 if u.query:
73 u.qsl = urllib.parse.parse_qsl(u.query)
74 for k, v in u.qsl:
75 u.params.setdefault(k.lower(), v)
77 if u.username:
78 u.username = unquote(u.username)
79 u.password = unquote(u.get('password', ''))
81 u.update(**kwargs)
82 return u
85def make_url(u: Optional[Url | dict] = None, **kwargs) -> str:
86 p = gws.u.merge({}, u, kwargs)
88 s = ''
90 if p.get('scheme'):
91 s += p['scheme'] + ':'
93 s += '//'
95 if p.get('username'):
96 s += quote_param(p['username']) + ':' + quote_param(p.get('password', '')) + '@'
98 if p.get('hostname'):
99 s += p['hostname']
100 if p.get('port'):
101 s += ':' + str(p['port'])
102 if p.get('path'):
103 s += '/'
104 else:
105 s += '/'
107 if p.get('path'):
108 s += quote_path(p['path'].lstrip('/'))
110 if p.get('params'):
111 s += '?' + make_qs(p['params'])
113 if p.get('fragment'):
114 s += '#' + p['fragment'].lstrip('#')
116 return s
119def parse_qs(x) -> dict:
120 return urllib.parse.parse_qs(x)
123def make_qs(x) -> str:
124 """Convert a dict/list to a query string.
126 For each item in x, if it's a list, join it with a comma, stringify and in utf8.
128 Args:
129 x: Value, which can be a dict'able or a list of key,value pairs.
131 Returns:
132 The query string.
133 """
135 p = []
136 items = x if isinstance(x, (list, tuple)) else gws.u.to_dict(x).items()
138 def _value(v):
139 if isinstance(v, (bytes, bytearray)):
140 return v
141 if isinstance(v, str):
142 return v.encode('utf8')
143 if v is True:
144 return b'true'
145 if v is False:
146 return b'false'
147 try:
148 return b','.join(_value(y) for y in v)
149 except TypeError:
150 return str(v).encode('utf8')
152 for k, v in items:
153 k = urllib.parse.quote_from_bytes(_value(k))
154 v = urllib.parse.quote_from_bytes(_value(v))
155 p.append(k + '=' + v)
157 return '&'.join(p)
160def quote_param(s: str) -> str:
161 return urllib.parse.quote(s, safe='')
164def quote_path(s: str) -> str:
165 return urllib.parse.quote(s, safe='/')
168def unquote(s: str) -> str:
169 return urllib.parse.unquote(s)
172def add_params(url: str, params: dict = None, **kwargs) -> str:
173 u = parse_url(url)
174 if params:
175 u.params.update(params)
176 u.params.update(kwargs)
177 return make_url(u)
180def extract_params(url: str) -> tuple[str, dict]:
181 u = parse_url(url)
182 params = u.params
183 u.params = None
184 return make_url(u), params
187def is_abs_url(url):
188 return re.match(r'^([a-z]+:|)//', url)
191##
194class HTTPResponse:
195 def __init__(self, ok: bool, url: str, res: requests.Response = None, text: str = None, status_code=0):
196 self.ok = ok
197 self.url = url
198 if res is not None:
199 self.content_type, self.content_encoding = _parse_content_type(res.headers)
200 self.content = res.content
201 self.status_code = res.status_code
202 else:
203 self.content_type, self.content_encoding = 'text/plain', 'utf8'
204 self.content = text.encode('utf8') if text is not None else b''
205 self.status_code = status_code
207 @property
208 def text(self) -> str:
209 if not hasattr(self, '_text'):
210 setattr(self, '_text', _get_text(self.content, self.content_encoding))
211 return getattr(self, '_text')
213 def raise_if_failed(self):
214 if not self.ok:
215 raise HTTPError(self.status_code, self.text)
218def _get_text(content, encoding) -> str:
219 if encoding:
220 try:
221 return str(content, encoding=encoding, errors='strict')
222 except UnicodeDecodeError:
223 pass
225 # some folks serve utf8 content without a header, in which case requests thinks it's ISO-8859-1
226 # (see http://docs.python-requests.org/en/master/user/advanced/#encodings)
227 #
228 # 'apparent_encoding' is not always reliable
229 #
230 # therefore when there's no header, we try utf8 first, and then ISO-8859-1
232 try:
233 return str(content, encoding='utf8', errors='strict')
234 except UnicodeDecodeError:
235 pass
237 try:
238 return str(content, encoding='ISO-8859-1', errors='strict')
239 except UnicodeDecodeError:
240 pass
242 # both failed, do utf8 with replace
244 gws.log.warning(f'decode failed')
245 return str(content, encoding='utf8', errors='replace')
248def _parse_content_type(headers):
249 # copied from requests.utils.get_encoding_from_headers, but with no ISO-8859-1 default
251 content_type = headers.get('content-type')
253 if not content_type:
254 # https://www.w3.org/Protocols/rfc2616/rfc2616-sec7.html#sec7.2.1
255 return 'application/octet-stream', None
257 ctype, params = cgi.parse_header(content_type)
258 if 'charset' not in params:
259 return ctype, None
261 enc = params['charset'].strip("'\"")
263 # make sure this is a valid python encoding
264 try:
265 str(b'.', encoding=enc, errors='strict')
266 except LookupError:
267 gws.log.warning(f'invalid content-type encoding {enc!r}')
268 return ctype, None
270 return ctype, enc
273##
275# @TODO locking for caches
278def http_request(url, **kwargs) -> HTTPResponse:
279 kwargs = dict(kwargs)
281 if 'params' in kwargs:
282 url = add_params(url, kwargs.pop('params'))
284 method = kwargs.pop('method', 'GET').upper()
285 max_age = kwargs.pop('max_age', 0)
286 cache_path = _cache_path(url)
288 if method == 'GET' and max_age:
289 age = gws.lib.osx.file_age(cache_path)
290 if 0 <= age < max_age:
291 gws.log.debug(f'HTTP_CACHED_{method}: url={url!r} path={cache_path!r} age={age}')
292 return gws.u.unserialize_from_path(cache_path)
294 gws.debug.time_start(f'HTTP_{method}={url!r}')
295 res = _http_request(method, url, kwargs)
296 gws.debug.time_end()
298 if method == 'GET' and max_age and res.ok:
299 gws.u.serialize_to_path(res, cache_path)
301 return res
304_DEFAULT_CONNECT_TIMEOUT = 60
305_DEFAULT_READ_TIMEOUT = 60
307_USER_AGENT = f'GBD WebSuite (https://gbd-websuite.de)'
310def _http_request(method, url, kwargs) -> HTTPResponse:
311 kwargs['stream'] = False
313 if 'verify' not in kwargs:
314 kwargs['verify'] = certifi.where()
316 timeout = kwargs.get('timeout', (_DEFAULT_CONNECT_TIMEOUT, _DEFAULT_READ_TIMEOUT))
317 if isinstance(timeout, (int, float)):
318 timeout = int(timeout), int(timeout)
319 kwargs['timeout'] = timeout
321 if 'headers' not in kwargs:
322 kwargs['headers'] = {}
323 kwargs['headers'].setdefault('User-Agent', _USER_AGENT)
325 try:
326 res = requests.request(method, url, **kwargs)
327 if 200 <= res.status_code < 300:
328 gws.log.debug(f'HTTP_OK_{method}: url={url!r} status={res.status_code!r}')
329 return HTTPResponse(ok=True, url=url, res=res)
330 gws.log.error(f'HTTP_FAILED_{method}: ({res.status_code!r}) url={url!r}')
331 return HTTPResponse(ok=False, url=url, res=res)
332 except requests.ConnectionError as exc:
333 gws.log.error(f'HTTP_FAILED_{method}: (ConnectionError) url={url!r}')
334 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=900)
335 except requests.Timeout as exc:
336 gws.log.error(f'HTTP_FAILED_{method}: (Timeout) url={url!r}')
337 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=901)
338 except requests.RequestException as exc:
339 gws.log.error(f'HTTP_FAILED_{method}: (Generic: {exc!r}) url={url!r}')
340 return HTTPResponse(ok=False, url=url, text=repr(exc), status_code=999)
343def _cache_path(url):
344 return gws.c.NET_CACHE_DIR + '/' + gws.u.sha256(url)