Coverage for gws-app/gws/base/ows/client/request.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1import gws 

2import gws.lib.net 

3 

4from . import error 

5 

6_ows_error_strings = '<ServiceException', '<ServerException', '<ows:ExceptionReport' 

7 

8 

9class Args(gws.Data): 

10 method: gws.RequestMethod 

11 headers: dict 

12 params: dict 

13 protocol: gws.OwsProtocol 

14 url: str 

15 verb: gws.OwsVerb 

16 version: str 

17 

18 

19def get_url(url: str, **kwargs) -> gws.lib.net.HTTPResponse: 

20 res = gws.lib.net.http_request(url, **kwargs) 

21 

22 # some folks serve OWS error documents with the status 200 

23 # therefore, we check for an ows error message, no matter what the status code says 

24 # we can get big image responses here, so be careful and don't blindly decode everything 

25 

26 if res.content.startswith(b'<') or 'xml' in res.content_type: 

27 text = str(res.content[:1024], encoding='utf8', errors='ignore') 

28 text_lower = text.lower() 

29 for err_string in _ows_error_strings: 

30 if err_string.lower() in text_lower: 

31 raise error.Error(text) 

32 try: 

33 res.raise_if_failed() 

34 except Exception as exc: 

35 raise error.Error(*exc.args) from exc 

36 return res 

37 

38 

39def get(args: Args, **kwargs) -> gws.lib.net.HTTPResponse: 

40 """Get a raw service response""" 

41 

42 params = { 

43 'SERVICE': str(args.protocol).upper(), 

44 'REQUEST': args.verb, 

45 } 

46 if args.version: 

47 params['VERSION'] = args.version 

48 if args.params: 

49 params.update(gws.u.to_upper_dict(args.params)) 

50 

51 return get_url(args.url, method=args.method or gws.RequestMethod.GET, params=params, headers=args.headers, **kwargs) 

52 

53 

54def get_text(args: Args, **kwargs) -> str: 

55 res = get(args, **kwargs) 

56 return res.text