Coverage for gws-app/gws/base/ows/server/action.py: 0%
52 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""OWS server action."""
3from typing import Optional, cast
5import gws
6import gws.base.action
7import gws.base.web
8import gws.lib.mime
9import gws.lib.xmlx
11from . import core, layer_caps, error
13gws.ext.new.action('ows')
16class GetServiceRequest(gws.Request):
17 serviceUid: str
20class GetSchemaRequest(gws.Request):
21 namespace: str
24class Config(gws.base.action.Config):
25 """OWS server action"""
28class Object(gws.base.action.Object):
29 @gws.ext.command.get('owsService')
30 def get_service(self, req: gws.WebRequester, p: GetServiceRequest) -> gws.ContentResponse:
31 srv = cast(gws.OwsService, self.root.get(p.serviceUid, gws.ext.object.owsService))
32 if not srv:
33 raise gws.NotFoundError(f'{p.serviceUid=} not found')
35 try:
36 if not req.user.can_use(srv):
37 raise gws.ForbiddenError(f'{p.serviceUid=} forbidden')
38 return srv.handle_request(req)
39 except Exception as exc:
40 err = error.from_exception(exc)
41 verb = req.param('REQUEST')
42 if verb in core.IMAGE_VERBS:
43 return err.to_image_response()
44 return err.to_xml_response('ows' if srv.isOwsCommon else 'ogc')
46 @gws.ext.command.get('owsXml')
47 def get_schema(self, req: gws.WebRequester, p: GetSchemaRequest) -> gws.ContentResponse:
48 try:
49 content = self._make_schema(req, p)
50 except Exception as exc:
51 return error.from_exception(exc).to_xml_response()
52 return gws.ContentResponse(mime=gws.lib.mime.XML, content=content)
54 def _make_schema(self, req, p) -> str:
55 s = p.namespace
56 if s.endswith('.xsd'):
57 s = s[:-4]
58 ns = gws.lib.xmlx.namespace.find_by_xmlns(s)
59 if not ns:
60 raise gws.NotFoundError(f'namespace not found: {p.namespace=}')
62 lcs = []
64 for p in self.root.find_all(gws.ext.object.layer):
65 layer = cast(gws.Layer, p)
66 if req.user.can_read(layer) and layer.ows.xmlNamespace and layer.ows.xmlNamespace.xmlns == ns.xmlns:
67 lcs.append(layer_caps.for_layer(layer, req.user))
69 xml = layer_caps.xml_schema(lcs, req.user)
70 if not xml:
71 raise gws.NotFoundError(f'cannot create schema: {p.namespace=}')
73 return xml.to_string(
74 with_xml_declaration=True,
75 with_namespace_declarations=True,
76 with_schema_locations=True
77 )