Coverage for gws-app/gws/plugin/ows_client/wfs/provider.py: 0%

68 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""WFS provider. 

2 

3References: 

4 

5- wfs 1.0.0: http://portal.opengeospatial.org/files/?artifact_id=7176 Sec 13.7.3 

6- wfs 1.1.0: http://portal.opengeospatial.org/files/?artifact_id=8339 Sec 14.7.3 

7- wfs 2.0.0: http://docs.opengeospatial.org/is/09-025r2/09-025r2.html Sec 11.1.3 

8 

9See also: 

10 

11- https://docs.geoserver.org/latest/en/user/services/wfs/reference.html 

12 

13""" 

14 

15from typing import Optional, cast 

16 

17import gws 

18import gws.base.ows.client 

19import gws.base.shape 

20import gws.config.util 

21import gws.gis.bounds 

22import gws.gis.crs 

23import gws.gis.extent 

24import gws.gis.source 

25 

26from . import caps 

27 

28 

29class Config(gws.base.ows.client.provider.Config): 

30 withBboxCrs: Optional[bool] 

31 """Add CRS to bbox request parameters. (added in 8.1)""" 

32 

33 

34class Object(gws.base.ows.client.provider.Object): 

35 protocol = gws.OwsProtocol.WFS 

36 withBboxCrs: bool 

37 isWfs2: bool 

38 

39 def configure(self): 

40 cc = caps.parse(self.get_capabilities()) 

41 

42 self.metadata = cc.metadata 

43 self.sourceLayers = cc.sourceLayers 

44 self.version = cc.version 

45 self.isWfs2 = self.version >= '2' 

46 

47 self.configure_operations(cc.operations) 

48 

49 # use bbox with crs for wfs 2 by default 

50 # see also comments in qgis/qgswfsfeatureiterator.cpp buildURL 

51 p = self.cfg('withBboxCrs') 

52 self.withBboxCrs = self.isWfs2 if p is None else p 

53 

54 DEFAULT_GET_FEATURE_LIMIT = 100 

55 

56 def get_features(self, search, source_layers): 

57 """Perform the WFS GetFeature operation. 

58 

59 We only do spatial searches here. 

60 If no bounds and no shapes are given, return all features. 

61 If a shape is given, find features within its bounds first, 

62 and filter features on our side. 

63 This is more performant than WFS spatial ops (at least for qgis), 

64 and also works without spatial ops support on the provider side. 

65 """ 

66 

67 bounds = search.bounds 

68 search_shape = None 

69 

70 if search.shape: 

71 geometry_tolerance = 0.0 

72 

73 if search.tolerance: 

74 n, u = search.tolerance 

75 geometry_tolerance = n * (search.resolution or 1) if u == 'px' else n 

76 

77 search_shape = search.shape.tolerance_polygon(geometry_tolerance) 

78 bounds = search_shape.bounds() 

79 

80 request_crs = self.forceCrs or gws.gis.crs.WGS84 

81 

82 bbox = gws.gis.bounds.transform(bounds, request_crs).extent 

83 if request_crs.isYX and not self.alwaysXY: 

84 bbox = gws.gis.extent.swap_xy(bbox) 

85 bbox = ','.join(str(k) for k in bbox) 

86 

87 srs = request_crs.urn 

88 if self.withBboxCrs: 

89 bbox += ',' + srs 

90 

91 params = { 

92 'BBOX': bbox, 

93 'COUNT' if self.isWfs2 else 'MAXFEATURES': search.limit or self.DEFAULT_GET_FEATURE_LIMIT, 

94 'SRSNAME': srs, 

95 'TYPENAMES' if self.isWfs2 else 'TYPENAME': [sl.name for sl in source_layers], 

96 'VERSION': self.version, 

97 } 

98 

99 if search.extraParams: 

100 params = gws.u.merge(params, gws.u.to_upper_dict(search.extraParams)) 

101 

102 op = self.get_operation(gws.OwsVerb.GetFeature) 

103 if not op: 

104 return [] 

105 

106 if op.preferredFormat: 

107 params.setdefault('OUTPUTFORMAT', op.preferredFormat) 

108 

109 args = self.prepare_operation(op, params=params) 

110 text = gws.base.ows.client.request.get_text(args) 

111 

112 records = gws.base.ows.client.featureinfo.parse(text, default_crs=request_crs, always_xy=self.alwaysXY) 

113 

114 if records is None: 

115 gws.log.debug(f'get_features: NOT_PARSED params={params!r}') 

116 return [] 

117 

118 gws.log.debug(f'get_features: FOUND={len(records)} params={params!r}') 

119 

120 for rec in records: 

121 if rec.shape: 

122 rec.shape = rec.shape.transformed_to(bounds.crs) 

123 

124 if not search_shape: 

125 return records 

126 

127 filtered = [ 

128 rec for rec in records 

129 if not rec.shape or rec.shape.intersects(search_shape) 

130 ] 

131 

132 gws.log.debug(f'get_features: FILTERED={len(filtered)}') 

133 return filtered