Coverage for gws-app/gws/base/ows/server/service.py: 0%

164 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""OWS Service.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.legend 

7import gws.base.template 

8import gws.base.web 

9import gws.config.util 

10import gws.gis.bounds 

11import gws.gis.crs 

12import gws.gis.extent 

13import gws.gis.gml 

14import gws.gis.render 

15import gws.gis.source 

16import gws.lib.datetimex 

17import gws.lib.image 

18import gws.lib.metadata 

19import gws.lib.mime 

20from . import core, request, error 

21 

22 

23class ImageFormatConfig: 

24 """Image format configuration. (added in 8.1)""" 

25 

26 mimeTypes: list[str] 

27 """Mime types for this format.""" 

28 options: Optional[dict] 

29 """Image options.""" 

30 

31 

32class Config(gws.ConfigWithAccess): 

33 defaultFeatureCount: int = 1000 

34 """Default number of features per page.""" 

35 extent: Optional[gws.Extent] 

36 """Service extent.""" 

37 imageFormats: Optional[list[ImageFormatConfig]] 

38 """Supported image formats. (added in 8.1)""" 

39 maxFeatureCount: int = 10000 

40 """Max number of features per page. (added in 8.1)""" 

41 metadata: Optional[gws.Metadata] 

42 """Service metadata.""" 

43 rootLayerUid: str = '' 

44 """Root layer uid.""" 

45 searchLimit: int = 10000 

46 """Search limit. (deprecated in 8.1)""" 

47 searchTolerance: gws.UomValueStr = '10px' 

48 """Search pixel tolerance.""" 

49 supportedCrs: Optional[list[gws.CrsName]] 

50 """List of CRS supported by this service.""" 

51 templates: Optional[list[gws.ext.config.template]] 

52 """XML and HTML templates.""" 

53 updateSequence: Optional[str] 

54 """Service update sequence.""" 

55 withInspireMeta: bool = False 

56 """Emit INSPIRE Metadata.""" 

57 withStrictParams: bool = False 

58 """Use strict params checking.""" 

59 

60 

61class Object(gws.OwsService): 

62 """Baseclass for OWS services.""" 

63 

64 def configure(self): 

65 self.project = self.find_closest(gws.ext.object.project) 

66 

67 self.updateSequence = self.cfg('updateSequence') 

68 self.withInspireMeta = self.cfg('withInspireMeta') 

69 self.withStrictParams = self.cfg('withStrictParams') 

70 

71 self.maxFeatureCount = self.cfg('maxFeatureCount') 

72 self.defaultFeatureCount = self.cfg('defaultFeatureCount') 

73 self.searchTolerance = self.cfg('searchTolerance') 

74 

75 self.configure_bounds() 

76 self.configure_image_formats() 

77 self.configure_templates() 

78 self.configure_operations() 

79 self.configure_metadata() 

80 

81 def configure_image_formats(self): 

82 p = self.cfg('imageFormats') 

83 if p: 

84 self.imageFormats = [] 

85 for cfg in p: 

86 self.imageFormats.append(gws.OwsImageFormat( 

87 mimeTypes=[s.replace(' ', '') for s in cfg.get('mimeTypes', [])], 

88 options=cfg.get('options') or {} 

89 )) 

90 return 

91 

92 self.imageFormats = [ 

93 gws.OwsImageFormat(mimeTypes=[gws.lib.mime.PNG], options={}), 

94 gws.OwsImageFormat(mimeTypes=[gws.lib.mime.JPEG], options={}), 

95 ] 

96 

97 def configure_bounds(self): 

98 crs_list = [gws.gis.crs.require(s) for s in self.cfg('supportedCrs', default=[])] 

99 if not crs_list: 

100 crs_list = [self.project.map.bounds.crs] if self.project else [gws.gis.crs.WEBMERCATOR] 

101 

102 p = self.cfg('extent') 

103 if p: 

104 bounds = gws.Bounds(crs=crs_list[0], extent=gws.gis.extent.from_list(p)) 

105 elif self.project: 

106 bounds = self.project.map.bounds 

107 else: 

108 bounds = gws.Bounds(crs=crs_list[0], extent=crs_list[0].extent) 

109 

110 self.supportedBounds = [gws.gis.bounds.transform(bounds, crs) for crs in crs_list] 

111 return True 

112 

113 def configure_templates(self): 

114 return gws.config.util.configure_templates_for(self) 

115 

116 def configure_operations(self): 

117 pass 

118 

119 def available_formats(self, verb: gws.OwsVerb): 

120 fs = set() 

121 

122 if verb in core.IMAGE_VERBS: 

123 for fmt in self.imageFormats: 

124 fs.update(fmt.mimeTypes) 

125 else: 

126 for tpl in self.templates: 

127 if tpl.subject == f'ows.{verb}': 

128 fs.update(tpl.mimeTypes) 

129 

130 return sorted(fs) 

131 

132 def configure_metadata(self): 

133 self.metadata = gws.lib.metadata.merge( 

134 gws.Metadata(), 

135 self.project.metadata if self.project else None, 

136 self.root.app.metadata, 

137 gws.lib.metadata.from_config(self.cfg('metadata')), 

138 ) 

139 return True 

140 

141 def post_configure(self): 

142 self.post_configure_root_layer() 

143 

144 def post_configure_root_layer(self): 

145 self.rootLayer = None 

146 

147 uid = self.cfg('rootLayerUid') 

148 if not uid: 

149 return 

150 

151 self.rootLayer = self.root.get(uid, gws.ext.object.layer) 

152 if not self.rootLayer: 

153 raise gws.NotFoundError(f'root layer {uid!r} not found') 

154 

155 prj = self.rootLayer.find_closest(gws.ext.object.project) 

156 if not self.project: 

157 self.project = prj 

158 return 

159 

160 if self.project != prj: 

161 raise gws.NotFoundError(f'root layer {uid!r} does not belong to {self.project!r}') 

162 

163 ## 

164 

165 def url_path(self) -> str: 

166 if self.project: 

167 return gws.u.action_url_path('owsService', serviceUid=self.uid, projectUid=self.project.uid) 

168 else: 

169 return gws.u.action_url_path('owsService', serviceUid=self.uid) 

170 

171 ## 

172 

173 def init_request(self, req: gws.WebRequester) -> request.Object: 

174 return request.Object(self, req) 

175 

176 def handle_request(self, req: gws.WebRequester) -> gws.ContentResponse: 

177 sr = self.init_request(req) 

178 return self.dispatch_request(sr) 

179 

180 def dispatch_request(self, sr: request.Object): 

181 fn = getattr(self, sr.operation.handlerName) 

182 return fn(sr) 

183 

184 def template_response(self, sr: request.Object, mime: str = '', **kwargs) -> gws.ContentResponse: 

185 tpl = self.root.app.templateMgr.find_template( 

186 f'ows.{sr.operation.verb}', 

187 where=[self, sr.project], 

188 user=sr.req.user, 

189 mime=mime, 

190 ) 

191 if not tpl: 

192 gws.log.debug(f'no template: {sr.operation.verb=} {mime=}') 

193 raise error.InvalidFormat() 

194 

195 args = request.TemplateArgs( 

196 sr=sr, 

197 service=self, 

198 serviceUrl=sr.req.url_for(self.url_path()), 

199 url_for=sr.req.url_for, 

200 version=sr.version, 

201 intVersion=int(sr.version.replace('.', '')), 

202 **kwargs, 

203 ) 

204 

205 return tpl.render(gws.TemplateRenderInput(args=args)) 

206 

207 def image_response(self, sr: request.Object, img: Optional[gws.Image], mime: str) -> gws.ContentResponse: 

208 ifmt = self.find_image_format(mime) 

209 if img: 

210 gws.log.debug(f'image_response: {img.mode()=} {img.size()=} {mime=} {ifmt.options}') 

211 content = img.to_bytes(mime, ifmt.options) if img else gws.lib.image.empty_pixel(mime) 

212 return gws.ContentResponse(mime=mime, content=content) 

213 

214 def find_image_format(self, mime: str) -> gws.OwsImageFormat: 

215 if not mime: 

216 return self.imageFormats[0] 

217 for f in self.imageFormats: 

218 if mime in f.mimeTypes: 

219 return f 

220 raise error.InvalidFormat() 

221 

222 def render_legend(self, sr: request.Object, lcs: list[core.LayerCaps], mime: str) -> gws.ContentResponse: 

223 uids = [lc.layer.uid for lc in lcs] 

224 cache_key = 'gws.base.ows.server.legend.' + gws.u.sha256(uids) + mime 

225 

226 def _get(): 

227 legend = cast(gws.Legend, self.root.create_temporary( 

228 gws.ext.object.legend, 

229 type='combined', 

230 layerUids=uids, 

231 )) 

232 lro = legend.render() 

233 if not lro: 

234 return self.image_response(sr, None, mime) 

235 return self.image_response(sr, gws.base.legend.output_to_image(lro), mime) 

236 

237 return gws.u.get_app_global(cache_key, _get) 

238 

239 def feature_collection(self, sr: request.Object, lcs: list[core.LayerCaps], hits: int, results: list[gws.SearchResult]) -> core.FeatureCollection: 

240 fc = core.FeatureCollection( 

241 members=[], 

242 timestamp=gws.lib.datetimex.now(), 

243 numMatched=hits, 

244 numReturned=len(results), 

245 ) 

246 

247 lcs_map = {id(lc.layer): lc for lc in lcs} 

248 

249 for r in results: 

250 r.feature.transform_to(sr.targetCrs) 

251 fc.members.append(core.FeatureCollectionMember( 

252 feature=r.feature, 

253 layer=r.layer, 

254 layerCaps=lcs_map.get(id(r.layer)) if r.layer else None 

255 )) 

256 

257 return fc