Coverage for gws-app/gws/base/ows/server/service.py: 0%
164 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""OWS Service."""
3from typing import Optional, cast
5import gws
6import gws.base.legend
7import gws.base.template
8import gws.base.web
9import gws.config.util
10import gws.gis.bounds
11import gws.gis.crs
12import gws.gis.extent
13import gws.gis.gml
14import gws.gis.render
15import gws.gis.source
16import gws.lib.datetimex
17import gws.lib.image
18import gws.lib.metadata
19import gws.lib.mime
20from . import core, request, error
23class ImageFormatConfig:
24 """Image format configuration. (added in 8.1)"""
26 mimeTypes: list[str]
27 """Mime types for this format."""
28 options: Optional[dict]
29 """Image options."""
32class Config(gws.ConfigWithAccess):
33 defaultFeatureCount: int = 1000
34 """Default number of features per page."""
35 extent: Optional[gws.Extent]
36 """Service extent."""
37 imageFormats: Optional[list[ImageFormatConfig]]
38 """Supported image formats. (added in 8.1)"""
39 maxFeatureCount: int = 10000
40 """Max number of features per page. (added in 8.1)"""
41 metadata: Optional[gws.Metadata]
42 """Service metadata."""
43 rootLayerUid: str = ''
44 """Root layer uid."""
45 searchLimit: int = 10000
46 """Search limit. (deprecated in 8.1)"""
47 searchTolerance: gws.UomValueStr = '10px'
48 """Search pixel tolerance."""
49 supportedCrs: Optional[list[gws.CrsName]]
50 """List of CRS supported by this service."""
51 templates: Optional[list[gws.ext.config.template]]
52 """XML and HTML templates."""
53 updateSequence: Optional[str]
54 """Service update sequence."""
55 withInspireMeta: bool = False
56 """Emit INSPIRE Metadata."""
57 withStrictParams: bool = False
58 """Use strict params checking."""
61class Object(gws.OwsService):
62 """Baseclass for OWS services."""
64 def configure(self):
65 self.project = self.find_closest(gws.ext.object.project)
67 self.updateSequence = self.cfg('updateSequence')
68 self.withInspireMeta = self.cfg('withInspireMeta')
69 self.withStrictParams = self.cfg('withStrictParams')
71 self.maxFeatureCount = self.cfg('maxFeatureCount')
72 self.defaultFeatureCount = self.cfg('defaultFeatureCount')
73 self.searchTolerance = self.cfg('searchTolerance')
75 self.configure_bounds()
76 self.configure_image_formats()
77 self.configure_templates()
78 self.configure_operations()
79 self.configure_metadata()
81 def configure_image_formats(self):
82 p = self.cfg('imageFormats')
83 if p:
84 self.imageFormats = []
85 for cfg in p:
86 self.imageFormats.append(gws.OwsImageFormat(
87 mimeTypes=[s.replace(' ', '') for s in cfg.get('mimeTypes', [])],
88 options=cfg.get('options') or {}
89 ))
90 return
92 self.imageFormats = [
93 gws.OwsImageFormat(mimeTypes=[gws.lib.mime.PNG], options={}),
94 gws.OwsImageFormat(mimeTypes=[gws.lib.mime.JPEG], options={}),
95 ]
97 def configure_bounds(self):
98 crs_list = [gws.gis.crs.require(s) for s in self.cfg('supportedCrs', default=[])]
99 if not crs_list:
100 crs_list = [self.project.map.bounds.crs] if self.project else [gws.gis.crs.WEBMERCATOR]
102 p = self.cfg('extent')
103 if p:
104 bounds = gws.Bounds(crs=crs_list[0], extent=gws.gis.extent.from_list(p))
105 elif self.project:
106 bounds = self.project.map.bounds
107 else:
108 bounds = gws.Bounds(crs=crs_list[0], extent=crs_list[0].extent)
110 self.supportedBounds = [gws.gis.bounds.transform(bounds, crs) for crs in crs_list]
111 return True
113 def configure_templates(self):
114 return gws.config.util.configure_templates_for(self)
116 def configure_operations(self):
117 pass
119 def available_formats(self, verb: gws.OwsVerb):
120 fs = set()
122 if verb in core.IMAGE_VERBS:
123 for fmt in self.imageFormats:
124 fs.update(fmt.mimeTypes)
125 else:
126 for tpl in self.templates:
127 if tpl.subject == f'ows.{verb}':
128 fs.update(tpl.mimeTypes)
130 return sorted(fs)
132 def configure_metadata(self):
133 self.metadata = gws.lib.metadata.merge(
134 gws.Metadata(),
135 self.project.metadata if self.project else None,
136 self.root.app.metadata,
137 gws.lib.metadata.from_config(self.cfg('metadata')),
138 )
139 return True
141 def post_configure(self):
142 self.post_configure_root_layer()
144 def post_configure_root_layer(self):
145 self.rootLayer = None
147 uid = self.cfg('rootLayerUid')
148 if not uid:
149 return
151 self.rootLayer = self.root.get(uid, gws.ext.object.layer)
152 if not self.rootLayer:
153 raise gws.NotFoundError(f'root layer {uid!r} not found')
155 prj = self.rootLayer.find_closest(gws.ext.object.project)
156 if not self.project:
157 self.project = prj
158 return
160 if self.project != prj:
161 raise gws.NotFoundError(f'root layer {uid!r} does not belong to {self.project!r}')
163 ##
165 def url_path(self) -> str:
166 if self.project:
167 return gws.u.action_url_path('owsService', serviceUid=self.uid, projectUid=self.project.uid)
168 else:
169 return gws.u.action_url_path('owsService', serviceUid=self.uid)
171 ##
173 def init_request(self, req: gws.WebRequester) -> request.Object:
174 return request.Object(self, req)
176 def handle_request(self, req: gws.WebRequester) -> gws.ContentResponse:
177 sr = self.init_request(req)
178 return self.dispatch_request(sr)
180 def dispatch_request(self, sr: request.Object):
181 fn = getattr(self, sr.operation.handlerName)
182 return fn(sr)
184 def template_response(self, sr: request.Object, mime: str = '', **kwargs) -> gws.ContentResponse:
185 tpl = self.root.app.templateMgr.find_template(
186 f'ows.{sr.operation.verb}',
187 where=[self, sr.project],
188 user=sr.req.user,
189 mime=mime,
190 )
191 if not tpl:
192 gws.log.debug(f'no template: {sr.operation.verb=} {mime=}')
193 raise error.InvalidFormat()
195 args = request.TemplateArgs(
196 sr=sr,
197 service=self,
198 serviceUrl=sr.req.url_for(self.url_path()),
199 url_for=sr.req.url_for,
200 version=sr.version,
201 intVersion=int(sr.version.replace('.', '')),
202 **kwargs,
203 )
205 return tpl.render(gws.TemplateRenderInput(args=args))
207 def image_response(self, sr: request.Object, img: Optional[gws.Image], mime: str) -> gws.ContentResponse:
208 ifmt = self.find_image_format(mime)
209 if img:
210 gws.log.debug(f'image_response: {img.mode()=} {img.size()=} {mime=} {ifmt.options}')
211 content = img.to_bytes(mime, ifmt.options) if img else gws.lib.image.empty_pixel(mime)
212 return gws.ContentResponse(mime=mime, content=content)
214 def find_image_format(self, mime: str) -> gws.OwsImageFormat:
215 if not mime:
216 return self.imageFormats[0]
217 for f in self.imageFormats:
218 if mime in f.mimeTypes:
219 return f
220 raise error.InvalidFormat()
222 def render_legend(self, sr: request.Object, lcs: list[core.LayerCaps], mime: str) -> gws.ContentResponse:
223 uids = [lc.layer.uid for lc in lcs]
224 cache_key = 'gws.base.ows.server.legend.' + gws.u.sha256(uids) + mime
226 def _get():
227 legend = cast(gws.Legend, self.root.create_temporary(
228 gws.ext.object.legend,
229 type='combined',
230 layerUids=uids,
231 ))
232 lro = legend.render()
233 if not lro:
234 return self.image_response(sr, None, mime)
235 return self.image_response(sr, gws.base.legend.output_to_image(lro), mime)
237 return gws.u.get_app_global(cache_key, _get)
239 def feature_collection(self, sr: request.Object, lcs: list[core.LayerCaps], hits: int, results: list[gws.SearchResult]) -> core.FeatureCollection:
240 fc = core.FeatureCollection(
241 members=[],
242 timestamp=gws.lib.datetimex.now(),
243 numMatched=hits,
244 numReturned=len(results),
245 )
247 lcs_map = {id(lc.layer): lc for lc in lcs}
249 for r in results:
250 r.feature.transform_to(sr.targetCrs)
251 fc.members.append(core.FeatureCollectionMember(
252 feature=r.feature,
253 layer=r.layer,
254 layerCaps=lcs_map.get(id(r.layer)) if r.layer else None
255 ))
257 return fc