Coverage for gws-app/gws/base/ows/client/featureinfo.py: 0%
119 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Parse WMS/WFS FeatureInfo responses."""
3import gws
4import gws.base.shape
5import gws.gis.gml
6import gws.lib.xmlx as xmlx
9def parse(text: str, default_crs: gws.Crs = None, always_xy=False) -> list[gws.FeatureRecord]:
10 gws.debug.time_start('featureinfo:parse')
11 res = _parse(text.strip(), default_crs, always_xy)
12 gws.debug.time_end()
13 return res
16def _parse(text, default_crs, always_xy):
17 xml_el = None
19 if text.startswith('<'):
20 try:
21 xml_el = xmlx.from_string(text, case_insensitive=True, normalize_namespaces=True)
22 except xmlx.Error as exc:
23 gws.log.error(f'XML parse error: {exc}')
25 if xml_el:
26 fn = _XML_FORMATS.get(xml_el.name)
27 if fn:
28 fds = fn(xml_el, default_crs, always_xy)
29 if fds is not None:
30 gws.log.debug(f'parsed with {fn.__name__} count={len(fds)}')
31 return fds
32 gws.log.error(f'XML parse error for {xml_el.name!r}')
33 return
35 # fallback for non-xml formats
36 # @TODO: json etc
38 for fn in _TEXT_FORMATS:
39 fds = fn(text, default_crs, always_xy)
40 if fds is not None:
41 gws.log.debug(f'parsed with {fn.__name__} count={len(fds)}')
42 return fds
45##
47def _parse_msgmloutput(xml_el: gws.XmlElement, default_crs, always_xy):
48 # msGMLOutput (MapServer)
49 #
50 # <msGMLOutput
51 # <LAYER_1>
52 # <gml:name>LAYER_NAME
53 # <FEATURE_1>
54 # <gml:boundedBy>
55 # ...
56 # </gml:boundedBy>
57 # <GEOMETRY>
58 # <gml:Point...
59 # </GEOMETRY>
60 # <attr>....</attr>
61 # <attr>....</attr>
62 #
64 fds = []
66 for layer_el in xml_el:
67 layer_name = layer_el.name
68 for feature_el in layer_el:
69 if _is_gml(feature_el) and feature_el.name == 'name':
70 layer_name = feature_el.text
71 else:
72 fd = _fdata_from_gml(feature_el, default_crs, always_xy)
73 fd.meta = {'layerName': layer_name}
74 fds.append(fd)
76 return fds
79def _parse_featurecollection(xml_el: gws.XmlElement, default_crs, always_xy):
80 # FeatureCollection (OGC)
81 #
82 # <FeatureCollection
83 # <wfs:member>
84 # <FEATURE gml:id=...
85 # <attr>....</attr>
86 # <attr> <nested>....</attr>
87 # <GEOMETRY>
88 # <gml:Point...
89 #
91 fds = []
93 for member_el in xml_el:
94 if member_el.name in {'member', 'featuremember'}:
95 if len(member_el) == 1 and len(member_el[0]) > 0:
96 # <wfs:member><my:feature><attr...
97 fds.append(_fdata_from_gml(member_el[0], default_crs, always_xy))
98 elif len(member_el) > 1:
99 # <wfs:member><attr...
100 fds.append(_fdata_from_gml(member_el, default_crs, always_xy))
102 return fds
105def _parse_getfeatureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy):
106 # GetFeatureInfoResponse (geoserver/qgis)
107 #
108 # <GetFeatureInfoResponse>
109 # <Layer name="....">
110 # <Feature id="...">
111 # <Attribute name="..." value="..."/>
112 # <Attribute name="geometry" value="<wkt>"/>
114 fds = []
116 for layer_el in xml_el:
117 layer_name = layer_el.get('name')
118 for feature_el in layer_el:
120 fd = gws.FeatureRecord(
121 attributes={},
122 uid=feature_el.get('id'),
123 meta={'layerName': layer_name},
124 )
126 for el in feature_el:
127 if el.name != 'attribute':
128 continue
129 key = el.get('name')
130 val = el.get('value')
131 if key == 'geometry':
132 fd.shape = gws.base.shape.from_wkt(val, default_crs)
133 elif val.strip():
134 fd.attributes[key] = val.strip()
136 fds.append(fd)
138 return fds
141def _parse_featureinforesponse(xml_el: gws.XmlElement, default_crs, always_xy):
142 # FeatureInfoResponse (Arcgis)
143 #
144 # https://webhelp.esri.com/arcims/9.3/General/mergedProjects/wms_connect/wms_connector/get_featureinfo.htm
145 #
146 # <FeatureInfoResponse...
147 # <fields objectid="15111" shape="polygon"...
148 # <fields objectid="15111" shape="polygon"...
150 fds = []
152 for fields_el in xml_el:
153 if fields_el.name == 'fields':
154 fd = gws.FeatureRecord(attributes={})
155 for key, val in fields_el.attrib.items():
156 if key.lower() in {'id', 'fid'}:
157 fd.uid = val
158 elif key.lower() != 'shape':
159 fd.attributes[key] = val
160 fds.append(fd)
162 return fds
165def _parse_geobak(xml_el: gws.XmlElement, default_crs, always_xy):
166 # GeoBAK (https://www.egovernment.sachsen.de/geodaten.html)
167 #
168 # <geobak_20:Sachdatenabfrage...
169 # <geobak_20:Kartenebene>....
170 # <geobak_20:Inhalt>
171 # <geobak_20:Datensatz>
172 # <geobak_20:Attribut>
173 # <geobak_20:Name>...
174 # <geobak_20:Wert>...
175 # <geobak_20:Inhalt>
176 # <geobak_20:Datensatz>
177 # ...
179 fds = []
181 for el in xml_el:
182 fd = gws.FeatureRecord(attributes={})
184 if el.name == 'kartenebene':
185 fd.meta = {'layerName': el.text}
186 continue
188 if el.name == 'inhalt':
189 for attr_el in el[0]:
190 key = attr_el[0].text.strip()
191 val = attr_el[1].text.strip()
192 if key != 'shape' and val.lower() != 'null':
193 fd.attributes[key] = val
195 fds.append(fd)
197 return fds
200##
202_DEEP_ATTRIBUTE_DELIMITER = '.'
205def _fdata_from_gml(feature_el, default_crs, always_xy) -> gws.FeatureRecord:
206 # like GDAL does:
207 # "When reading a feature, the driver will by default only take into account
208 # the last recognized GML geometry found..." (https://gdal.org/drivers/vector/gml.html)
210 fd = gws.FeatureRecord(
211 attributes={},
212 uid=feature_el.get('id') or feature_el.get('fid'),
213 meta={'layerName': feature_el.name},
214 )
216 bbox = None
218 for el in feature_el:
219 if el.name == 'boundedby':
220 # <gml:boundedBy directly under feature
221 bbox = gws.gis.gml.parse_envelope(el[0], default_crs, always_xy)
222 elif gws.gis.gml.is_geometry_element(el):
223 # <gml:Polygon etc directly under feature
224 fd.shape = gws.gis.gml.parse_shape(el, default_crs, always_xy)
225 elif len(el) == 1 and gws.gis.gml.is_geometry_element(el[0]):
226 # <gml:Polygon etc in a wrapper tag
227 fd.shape = gws.gis.gml.parse_shape(el[0], default_crs, always_xy)
228 elif len(el) > 0:
229 # sub-feature
230 sub = _fdata_from_gml(el, default_crs, always_xy)
231 for k, v in sub.attributes.items():
232 fd.attributes[el.name + _DEEP_ATTRIBUTE_DELIMITER + k] = v
233 else:
234 # attribute <attr>text</attr>
235 s = el.text.strip()
236 if s:
237 fd.attributes[el.name] = s
239 if not fd.shape and bbox:
240 fd.shape = gws.base.shape.from_bounds(bbox)
242 return fd
245def _is_gml(el):
246 return 'gml}' in el.tag
249##
252_XML_FORMATS = {
253 'msgmloutput': _parse_msgmloutput,
254 'featurecollection': _parse_featurecollection,
255 'getfeatureinforesponse': _parse_getfeatureinforesponse,
256 'featureinforesponse': _parse_featureinforesponse,
257 'sachdatenabfrage': _parse_geobak,
258}
260_TEXT_FORMATS = [
261]