Coverage for gws-app/gws/base/ows/server/action.py: 0%

52 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""OWS server action.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.action 

7import gws.base.web 

8import gws.lib.mime 

9import gws.lib.xmlx 

10 

11from . import core, layer_caps, error 

12 

13gws.ext.new.action('ows') 

14 

15 

16class GetServiceRequest(gws.Request): 

17 serviceUid: str 

18 

19 

20class GetSchemaRequest(gws.Request): 

21 namespace: str 

22 

23 

24class Config(gws.base.action.Config): 

25 """OWS server action""" 

26 

27 

28class Object(gws.base.action.Object): 

29 @gws.ext.command.get('owsService') 

30 def get_service(self, req: gws.WebRequester, p: GetServiceRequest) -> gws.ContentResponse: 

31 srv = cast(gws.OwsService, self.root.get(p.serviceUid, gws.ext.object.owsService)) 

32 if not srv: 

33 raise gws.NotFoundError(f'{p.serviceUid=} not found') 

34 

35 try: 

36 if not req.user.can_use(srv): 

37 raise gws.ForbiddenError(f'{p.serviceUid=} forbidden') 

38 return srv.handle_request(req) 

39 except Exception as exc: 

40 err = error.from_exception(exc) 

41 verb = req.param('REQUEST') 

42 if verb in core.IMAGE_VERBS: 

43 return err.to_image_response() 

44 return err.to_xml_response('ows' if srv.isOwsCommon else 'ogc') 

45 

46 @gws.ext.command.get('owsXml') 

47 def get_schema(self, req: gws.WebRequester, p: GetSchemaRequest) -> gws.ContentResponse: 

48 try: 

49 content = self._make_schema(req, p) 

50 except Exception as exc: 

51 return error.from_exception(exc).to_xml_response() 

52 return gws.ContentResponse(mime=gws.lib.mime.XML, content=content) 

53 

54 def _make_schema(self, req, p) -> str: 

55 s = p.namespace 

56 if s.endswith('.xsd'): 

57 s = s[:-4] 

58 ns = gws.lib.xmlx.namespace.find_by_xmlns(s) 

59 if not ns: 

60 raise gws.NotFoundError(f'namespace not found: {p.namespace=}') 

61 

62 lcs = [] 

63 

64 for p in self.root.find_all(gws.ext.object.layer): 

65 layer = cast(gws.Layer, p) 

66 if req.user.can_read(layer) and layer.ows.xmlNamespace and layer.ows.xmlNamespace.xmlns == ns.xmlns: 

67 lcs.append(layer_caps.for_layer(layer, req.user)) 

68 

69 xml = layer_caps.xml_schema(lcs, req.user) 

70 if not xml: 

71 raise gws.NotFoundError(f'cannot create schema: {p.namespace=}') 

72 

73 return xml.to_string( 

74 with_xml_declaration=True, 

75 with_namespace_declarations=True, 

76 with_schema_locations=True 

77 )