Coverage for gws-app/gws/base/ows/client/request.py: 0%
35 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1import gws
2import gws.lib.net
4from . import error
6_ows_error_strings = '<ServiceException', '<ServerException', '<ows:ExceptionReport'
9class Args(gws.Data):
10 method: gws.RequestMethod
11 headers: dict
12 params: dict
13 protocol: gws.OwsProtocol
14 url: str
15 verb: gws.OwsVerb
16 version: str
19def get_url(url: str, **kwargs) -> gws.lib.net.HTTPResponse:
20 res = gws.lib.net.http_request(url, **kwargs)
22 # some folks serve OWS error documents with the status 200
23 # therefore, we check for an ows error message, no matter what the status code says
24 # we can get big image responses here, so be careful and don't blindly decode everything
26 if res.content.startswith(b'<') or 'xml' in res.content_type:
27 text = str(res.content[:1024], encoding='utf8', errors='ignore')
28 text_lower = text.lower()
29 for err_string in _ows_error_strings:
30 if err_string.lower() in text_lower:
31 raise error.Error(text)
32 try:
33 res.raise_if_failed()
34 except Exception as exc:
35 raise error.Error(*exc.args) from exc
36 return res
39def get(args: Args, **kwargs) -> gws.lib.net.HTTPResponse:
40 """Get a raw service response"""
42 params = {
43 'SERVICE': str(args.protocol).upper(),
44 'REQUEST': args.verb,
45 }
46 if args.version:
47 params['VERSION'] = args.version
48 if args.params:
49 params.update(gws.u.to_upper_dict(args.params))
51 return get_url(args.url, method=args.method or gws.RequestMethod.GET, params=params, headers=args.headers, **kwargs)
54def get_text(args: Args, **kwargs) -> str:
55 res = get(args, **kwargs)
56 return res.text