Coverage for gws-app/gws/base/ows/client/featureinfo.py: 0%

119 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Parse WMS/WFS FeatureInfo responses.""" 

2 

3import gws 

4import gws.base.shape 

5import gws.gis.gml 

6import gws.lib.xmlx as xmlx 

7 

8 

9def parse(text: str, default_crs: gws.Crs = None, always_xy=False) -> list[gws.FeatureRecord]: 

10 gws.debug.time_start('featureinfo:parse') 

11 res = _parse(text.strip(), default_crs, always_xy) 

12 gws.debug.time_end() 

13 return res 

14 

15 

16def _parse(text, default_crs, always_xy): 

17 xml_el = None 

18 

19 if text.startswith('<'): 

20 try: 

21 xml_el = xmlx.from_string(text, case_insensitive=True, normalize_namespaces=True) 

22 except xmlx.Error as exc: 

23 gws.log.error(f'XML parse error: {exc}') 

24 

25 if xml_el: 

26 fn = _XML_FORMATS.get(xml_el.name) 

27 if fn: 

28 fds = fn(xml_el, default_crs, always_xy) 

29 if fds is not None: 

30 gws.log.debug(f'parsed with {fn.__name__} count={len(fds)}') 

31 return fds 

32 gws.log.error(f'XML parse error for {xml_el.name!r}') 

33 return 

34 

35 # fallback for non-xml formats 

36 # @TODO: json etc 

37 

38 for fn in _TEXT_FORMATS: 

39 fds = fn(text, default_crs, always_xy) 

40 if fds is not None: 

41 gws.log.debug(f'parsed with {fn.__name__} count={len(fds)}') 

42 return fds 

43 

44 

45## 

46 

47def _parse_msgmloutput(xml_el: gws.XmlElement, default_crs, always_xy): 

48 # msGMLOutput (MapServer) 

49 # 

50 # <msGMLOutput 

51 # <LAYER_1> 

52 # <gml:name>LAYER_NAME 

53 # <FEATURE_1> 

54 # <gml:boundedBy> 

55 # ... 

56 # </gml:boundedBy> 

57 # <GEOMETRY> 

58 # <gml:Point... 

59 # </GEOMETRY> 

60 # <attr>....</attr> 

61 # <attr>....</attr> 

62 # 

63 

64 fds = [] 

65 

66 for layer_el in xml_el: 

67 layer_name = layer_el.name 

68 for feature_el in layer_el: 

69 if _is_gml(feature_el) and feature_el.name == 'name': 

70 layer_name = feature_el.text 

71 else: 

72 fd = _fdata_from_gml(feature_el, default_crs, always_xy) 

73 fd.meta = {'layerName': layer_name} 

74 fds.append(fd) 

75 

76 return fds 

77 

78 

79def _parse_featurecollection(xml_el: gws.XmlElement, default_crs, always_xy): 

80 # FeatureCollection (OGC) 

81 # 

82 # <FeatureCollection 

83 # <wfs:member> 

84 # <FEATURE gml:id=... 

85 # <attr>....</attr> 

86 # <attr> <nested>....</attr> 

87 # <GEOMETRY> 

88 # <gml:Point... 

89 # 

90 

91 fds = [] 

92 

93 for member_el in xml_el: 

94 if member_el.name in {'member', 'featuremember'}: 

95 if len(member_el) == 1 and len(member_el[0]) > 0: 

96 # <wfs:member><my:feature><attr... 

97 fds.append(_fdata_from_gml(member_el[0], default_crs, always_xy)) 

98 elif len(member_el) > 1: 

99 # <wfs:member><attr... 

100 fds.append(_fdata_from_gml(member_el, default_crs, always_xy)) 

101 

102 return fds 

103 

104 

105def _parse_getfeatureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy): 

106 # GetFeatureInfoResponse (geoserver/qgis) 

107 # 

108 # <GetFeatureInfoResponse> 

109 # <Layer name="...."> 

110 # <Feature id="..."> 

111 # <Attribute name="..." value="..."/> 

112 # <Attribute name="geometry" value="<wkt>"/> 

113 

114 fds = [] 

115 

116 for layer_el in xml_el: 

117 layer_name = layer_el.get('name') 

118 for feature_el in layer_el: 

119 

120 fd = gws.FeatureRecord( 

121 attributes={}, 

122 uid=feature_el.get('id'), 

123 meta={'layerName': layer_name}, 

124 ) 

125 

126 for el in feature_el: 

127 if el.name != 'attribute': 

128 continue 

129 key = el.get('name') 

130 val = el.get('value') 

131 if key == 'geometry': 

132 fd.shape = gws.base.shape.from_wkt(val, default_crs) 

133 elif val.strip(): 

134 fd.attributes[key] = val.strip() 

135 

136 fds.append(fd) 

137 

138 return fds 

139 

140 

141def _parse_featureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy): 

142 # FeatureInfoResponse (Arcgis) 

143 # 

144 # https://webhelp.esri.com/arcims/9.3/General/mergedProjects/wms_connect/wms_connector/get_featureinfo.htm 

145 # 

146 # <FeatureInfoResponse... 

147 # <fields objectid="15111" shape="polygon"... 

148 # <fields objectid="15111" shape="polygon"... 

149 

150 fds = [] 

151 

152 for fields_el in xml_el: 

153 if fields_el.name == 'fields': 

154 fd = gws.FeatureRecord(attributes={}) 

155 for key, val in fields_el.attrib.items(): 

156 if key.lower() in {'id', 'fid'}: 

157 fd.uid = val 

158 elif key.lower() != 'shape': 

159 fd.attributes[key] = val 

160 fds.append(fd) 

161 

162 return fds 

163 

164 

165def _parse_geobak(xml_el: gws.XmlElement, default_crs, always_xy): 

166 # GeoBAK (https://www.egovernment.sachsen.de/geodaten.html) 

167 # 

168 # <geobak_20:Sachdatenabfrage... 

169 # <geobak_20:Kartenebene>.... 

170 # <geobak_20:Inhalt> 

171 # <geobak_20:Datensatz> 

172 # <geobak_20:Attribut> 

173 # <geobak_20:Name>... 

174 # <geobak_20:Wert>... 

175 # <geobak_20:Inhalt> 

176 # <geobak_20:Datensatz> 

177 # ... 

178 

179 fds = [] 

180 

181 for el in xml_el: 

182 fd = gws.FeatureRecord(attributes={}) 

183 

184 if el.name == 'kartenebene': 

185 fd.meta = {'layerName': el.text} 

186 continue 

187 

188 if el.name == 'inhalt': 

189 for attr_el in el[0]: 

190 key = attr_el[0].text.strip() 

191 val = attr_el[1].text.strip() 

192 if key != 'shape' and val.lower() != 'null': 

193 fd.attributes[key] = val 

194 

195 fds.append(fd) 

196 

197 return fds 

198 

199 

200## 

201 

202_DEEP_ATTRIBUTE_DELIMITER = '.' 

203 

204 

205def _fdata_from_gml(feature_el, default_crs, always_xy) -> gws.FeatureRecord: 

206 # like GDAL does: 

207 # "When reading a feature, the driver will by default only take into account 

208 # the last recognized GML geometry found..." (https://gdal.org/drivers/vector/gml.html) 

209 

210 fd = gws.FeatureRecord( 

211 attributes={}, 

212 uid=feature_el.get('id') or feature_el.get('fid'), 

213 meta={'layerName': feature_el.name}, 

214 ) 

215 

216 bbox = None 

217 

218 for el in feature_el: 

219 if el.name == 'boundedby': 

220 # <gml:boundedBy directly under feature 

221 bbox = gws.gis.gml.parse_envelope(el[0], default_crs, always_xy) 

222 elif gws.gis.gml.is_geometry_element(el): 

223 # <gml:Polygon etc directly under feature 

224 fd.shape = gws.gis.gml.parse_shape(el, default_crs, always_xy) 

225 elif len(el) == 1 and gws.gis.gml.is_geometry_element(el[0]): 

226 # <gml:Polygon etc in a wrapper tag 

227 fd.shape = gws.gis.gml.parse_shape(el[0], default_crs, always_xy) 

228 elif len(el) > 0: 

229 # sub-feature 

230 sub = _fdata_from_gml(el, default_crs, always_xy) 

231 for k, v in sub.attributes.items(): 

232 fd.attributes[el.name + _DEEP_ATTRIBUTE_DELIMITER + k] = v 

233 else: 

234 # attribute <attr>text</attr> 

235 s = el.text.strip() 

236 if s: 

237 fd.attributes[el.name] = s 

238 

239 if not fd.shape and bbox: 

240 fd.shape = gws.base.shape.from_bounds(bbox) 

241 

242 return fd 

243 

244 

245def _is_gml(el): 

246 return 'gml}' in el.tag 

247 

248 

249## 

250 

251 

252_XML_FORMATS = { 

253 'msgmloutput': _parse_msgmloutput, 

254 'featurecollection': _parse_featurecollection, 

255 'getfeatureinforesponse': _parse_getfeatureinforesponse, 

256 'featureinforesponse': _parse_featureinforesponse, 

257 'sachdatenabfrage': _parse_geobak, 

258} 

259 

260_TEXT_FORMATS = [ 

261]