Coverage for gws-app/gws/base/ows/server/request.py: 0%

171 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Service Request object.""" 

2 

3from typing import Optional, Callable 

4 

5import gws 

6import gws.base.layer.core 

7import gws.base.legend 

8import gws.base.model 

9import gws.base.web 

10import gws.gis.extent 

11import gws.gis.render 

12import gws.lib.mime 

13import gws.gis.bounds 

14import gws.gis.crs 

15import gws.lib.image 

16import gws.lib.uom 

17 

18from . import core, layer_caps, error 

19 

20 

21class TemplateArgs(gws.TemplateArgs): 

22 """Arguments for service templates.""" 

23 

24 featureCollection: core.FeatureCollection 

25 layerCapsList: list[core.LayerCaps] 

26 sr: 'Object' 

27 service: gws.OwsService 

28 serviceUrl: str 

29 url_for: Callable 

30 gmlVersion: int 

31 version: str 

32 intVersion: int 

33 tileMatrixSets: list[gws.TileMatrixSet] 

34 

35 

36class Object: 

37 alwaysXY: bool 

38 bounds: gws.Bounds 

39 crs: gws.Crs 

40 isSoap: bool = False 

41 layerCapsList: list[core.LayerCaps] 

42 operation: gws.OwsOperation 

43 project: gws.Project 

44 req: gws.WebRequester 

45 service: gws.OwsService 

46 targetCrs: gws.Crs 

47 version: str 

48 xmlElement: Optional[gws.XmlElement] 

49 

50 def __init__(self, service: gws.OwsService, req: gws.WebRequester): 

51 self.service = service 

52 self.req = req 

53 self.project = self.requested_project() 

54 

55 self.operation = self.requested_operation('REQUEST') 

56 self.version = self.requested_version('VERSION,ACCEPTVERSIONS') 

57 

58 cache_key = 'layer_caps_' + gws.u.sha256([self.service.uid, self.project.uid, sorted(self.req.user.roles)]) 

59 self.layerCapsList = gws.u.get_app_global(cache_key, self.enum_layer_caps) 

60 

61 self.alwaysXY = False 

62 self.isSoap = False 

63 self.pxWidth = 0 

64 self.pxHeight = 0 

65 self.xResolution = 0 

66 self.yResolution = 0 

67 

68 # OGC 06-042, 7.2.3.5 

69 if self.service.updateSequence: 

70 s = self.string_param('UPDATESEQUENCE', default='') 

71 if s and s == self.service.updateSequence: 

72 raise error.CurrentUpdateSequence() 

73 if s and s > self.service.updateSequence: 

74 raise error.InvalidUpdateSequence() 

75 

76 def enum_layer_caps(self): 

77 lcs = [] 

78 root_layer = self.service.rootLayer or self.project.map.rootLayer 

79 self._enum_layer_caps(root_layer, lcs, []) 

80 return lcs 

81 

82 def _enum_layer_caps(self, layer: gws.Layer, lcs: list[core.LayerCaps], stack: list[core.LayerCaps]): 

83 if not self.req.user.can_read(layer) or not layer.isEnabledForOws: 

84 return 

85 

86 is_suitable = self.service.layer_is_suitable(layer) 

87 if not is_suitable and not layer.isGroup: 

88 return 

89 

90 lc = layer_caps.for_layer(layer, self.req.user, self.service) 

91 

92 # NB groups must be inspected even if not 'suitable' 

93 if layer.isGroup: 

94 lc.isGroup = True 

95 n = len(lcs) 

96 for sub_layer in layer.layers: 

97 self._enum_layer_caps(sub_layer, lcs, stack + [lc]) 

98 if not lc.children: 

99 # no empty groups 

100 return 

101 if is_suitable: 

102 lc.hasLegend = any(c.hasLegend for c in lc.children) 

103 lc.isSearchable = any(c.isSearchable for c in lc.children) 

104 lcs.insert(n, lc) 

105 else: 

106 lc.isGroup = False 

107 lcs.append(lc) 

108 for sup_lc in stack: 

109 sup_lc.leaves.append(lc) 

110 

111 if stack: 

112 stack[-1].children.append(lc) 

113 

114 ## 

115 

116 def requested_project(self) -> gws.Project: 

117 # services can be configured globally (in which case, service.project == None) 

118 # and applied to multiple projects with the projectUid param 

119 # or, configured just for a single project (service.project != None) 

120 

121 p = self.req.param('projectUid') 

122 if p: 

123 project = self.req.user.require_project(p) 

124 if self.service.project and project != self.service.project: 

125 raise gws.NotFoundError(f'ows {self.service.uid=}: wrong project={p!r}') 

126 return project 

127 

128 if self.service.project: 

129 # for in-project services, ensure the user can access the project 

130 return self.req.user.require_project(self.service.project.uid) 

131 

132 raise gws.NotFoundError(f'project not found for {self.service}') 

133 

134 def requested_version(self, param_names: str) -> str: 

135 p, val = self._get_param(param_names, default='') 

136 if not val: 

137 # the first supported version is the default 

138 return self.service.supportedVersions[0] 

139 

140 for v in gws.u.to_list(val): 

141 for ver in self.service.supportedVersions: 

142 if ver.startswith(v): 

143 return ver 

144 

145 raise error.VersionNegotiationFailed() 

146 

147 _param2verb = { 

148 'createstoredquery': gws.OwsVerb.CreateStoredQuery, 

149 'describecoverage': gws.OwsVerb.DescribeCoverage, 

150 'describefeaturetype': gws.OwsVerb.DescribeFeatureType, 

151 'describelayer': gws.OwsVerb.DescribeLayer, 

152 'describerecord': gws.OwsVerb.DescribeRecord, 

153 'describestoredqueries': gws.OwsVerb.DescribeStoredQueries, 

154 'dropstoredquery': gws.OwsVerb.DropStoredQuery, 

155 'getcapabilities': gws.OwsVerb.GetCapabilities, 

156 'getfeature': gws.OwsVerb.GetFeature, 

157 'getfeatureinfo': gws.OwsVerb.GetFeatureInfo, 

158 'getfeaturewithlock': gws.OwsVerb.GetFeatureWithLock, 

159 'getlegendgraphic': gws.OwsVerb.GetLegendGraphic, 

160 'getmap': gws.OwsVerb.GetMap, 

161 'getprint': gws.OwsVerb.GetPrint, 

162 'getpropertyvalue': gws.OwsVerb.GetPropertyValue, 

163 'getrecordbyid': gws.OwsVerb.GetRecordById, 

164 'getrecords': gws.OwsVerb.GetRecords, 

165 'gettile': gws.OwsVerb.GetTile, 

166 'liststoredqueries': gws.OwsVerb.ListStoredQueries, 

167 'lockfeature': gws.OwsVerb.LockFeature, 

168 'transaction': gws.OwsVerb.Transaction, 

169 } 

170 

171 def requested_operation(self, param_names: str) -> gws.OwsOperation: 

172 p, val = self._get_param(param_names, default=None) 

173 verb = self._param2verb.get(val.lower()) 

174 if not verb: 

175 raise error.InvalidParameterValue(p) 

176 

177 for op in self.service.supportedOperations: 

178 if op.verb == verb: 

179 return op 

180 

181 raise error.OperationNotSupported() 

182 

183 def requested_crs(self, param_names: str) -> Optional[gws.Crs]: 

184 _, val = self._get_param(param_names, default='') 

185 if not val: 

186 return 

187 

188 crs = gws.gis.crs.get(val) 

189 if not crs: 

190 raise error.InvalidCRS() 

191 

192 for b in self.service.supportedBounds: 

193 if crs == b.crs: 

194 return crs 

195 

196 raise error.InvalidCRS() 

197 

198 def requested_bounds(self, param_names: str) -> Optional[gws.Bounds]: 

199 # OGC 06-042, 7.2.3.5 

200 # OGC 00-028, 6.2.8.2.3 

201 

202 p, val = self._get_param(param_names, '') 

203 if not val: 

204 return 

205 

206 bounds = gws.gis.bounds.from_request_bbox(val, default_crs=self.crs, always_xy=self.alwaysXY) 

207 if bounds: 

208 return gws.gis.bounds.transform(bounds, self.crs) 

209 

210 raise error.InvalidParameterValue(p) 

211 

212 def requested_format(self, param_names: str) -> str: 

213 s = self.string_param(param_names, default='').strip() 

214 if s: 

215 # NB our mime types do not contain spaces 

216 return ''.join(s.split()) 

217 return '' 

218 

219 def requested_feature_count(self, param_names: str) -> int: 

220 s = self.int_param(param_names, default=0) 

221 if s <= 0: 

222 return self.service.defaultFeatureCount 

223 return min(self.service.maxFeatureCount, s) 

224 

225 ## 

226 

227 def _get_param(self, param_names, default): 

228 names = gws.u.to_list(param_names.upper()) 

229 

230 for p in names: 

231 if not self.req.has_param(p): 

232 continue 

233 val = self.req.param(p) 

234 return p, val 

235 

236 if default is not None: 

237 return '', default 

238 

239 raise error.MissingParameterValue(names[0]) 

240 

241 def string_param(self, param_names: str, values: Optional[set[str]] = None, default: Optional[str] = None) -> str: 

242 p, val = self._get_param(param_names, default) 

243 if values: 

244 val = val.lower() 

245 if val not in values: 

246 raise error.InvalidParameterValue(p) 

247 return val 

248 

249 def list_param(self, param_names: str) -> list[str]: 

250 _, val = self._get_param(param_names, '') 

251 return gws.u.to_list(val) 

252 

253 def int_param(self, param_names: str, default: Optional[int] = None) -> int: 

254 p, val = self._get_param(param_names, default) 

255 try: 

256 return int(val) 

257 except ValueError: 

258 raise error.InvalidParameterValue(p)