Coverage for gws-app/gws/base/shape/__init__.py: 32%

205 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Shape object. 

2 

3The Shape object implements the IShape protocol (georefenced geometry). 

4Internally, it holds a pointer to a Shapely geometry object and a Crs object. 

5""" 

6 

7# @TODO support for SQL/MM extensions 

8 

9import struct 

10import shapely.geometry 

11import shapely.ops 

12import shapely.wkb 

13import shapely.wkt 

14 

15import gws 

16import gws.gis.crs 

17import gws.lib.sa as sa 

18 

19_TOLERANCE_QUAD_SEGS = 6 

20_MIN_TOLERANCE_POLYGON = 0.01 # 1 cm for metric projections 

21 

22 

23class Error(gws.Error): 

24 pass 

25 

26 

27def from_wkt(wkt: str, default_crs: gws.Crs = None) -> gws.Shape: 

28 """Creates a shape object from a WKT string. 

29 

30 Args: 

31 wkt: A WKT or EWKT string. 

32 default_crs: Default Crs. 

33 

34 Returns: 

35 A Shape object. 

36 """ 

37 

38 if wkt.startswith('SRID='): 

39 # EWKT 

40 c = wkt.index(';') 

41 srid = wkt[len('SRID='):c] 

42 crs = gws.gis.crs.get(int(srid)) 

43 wkt = wkt[c + 1:] 

44 elif default_crs: 

45 crs = default_crs 

46 else: 

47 raise Error('missing or invalid crs for WKT') 

48 

49 return Shape(shapely.wkt.loads(wkt), crs) 

50 

51 

52def from_wkb(wkb: bytes, default_crs: gws.Crs = None) -> gws.Shape: 

53 """Creates a shape object from a WKB byte string. 

54 

55 Args: 

56 wkb: A WKB or EWKB byte string. 

57 default_crs: Default Crs. 

58 

59 Returns: 

60 A Shape object. 

61 """ 

62 

63 return _from_wkb(wkb, default_crs) 

64 

65 

66def from_wkb_hex(wkb: str, default_crs: gws.Crs = None) -> gws.Shape: 

67 """Creates a shape object from a hex-encoded WKB string. 

68 

69 Args: 

70 wkb: A hex-encoded WKB or EWKB byte string. 

71 default_crs: Default Crs. 

72 

73 Returns: 

74 A Shape object. 

75 """ 

76 

77 return _from_wkb(bytes.fromhex(wkb), default_crs) 

78 

79 

80def _from_wkb(wkb: bytes, default_crs): 

81 # http://libgeos.org/specifications/wkb/#extended-wkb 

82 

83 byte_order = wkb[0] 

84 header = struct.unpack('<cLL' if byte_order == 1 else '>cLL', wkb[:9]) 

85 

86 if header[1] & 0x20000000: 

87 crs = gws.gis.crs.get(header[2]) 

88 elif default_crs: 

89 crs = default_crs 

90 else: 

91 raise Error('missing or invalid crs for WKB') 

92 

93 geom = shapely.wkb.loads(wkb) 

94 return Shape(geom, crs) 

95 

96 

97def from_wkb_element(element: sa.geo.WKBElement, default_crs: gws.Crs = None): 

98 data = element.data 

99 if isinstance(data, str): 

100 wkb = bytes.fromhex(data) 

101 else: 

102 wkb = bytes(data) 

103 crs = gws.gis.crs.get(element.srid) 

104 return _from_wkb(wkb, crs or default_crs) 

105 

106 

107def from_geojson(geojson: dict, crs: gws.Crs, always_xy=False) -> gws.Shape: 

108 """Creates a shape object from a GeoJSON geometry dict. 

109 

110 Parses a dict as a GeoJSON geometry object (https://www.rfc-editor.org/rfc/rfc7946#section-3.1). 

111 

112 The coordinates are assumed to be in the projection order, unless ``always_xy`` is ``True``. 

113 

114 Args: 

115 geojson: A GeoJSON geometry dict 

116 crs: A Crs object. 

117 always_xy: If ``True``, coordinates are assumed to be in the XY (lon/lat) order 

118 

119 Returns: 

120 A Shape object. 

121 """ 

122 

123 geom = _shapely_shape(geojson) 

124 if crs.isYX and not always_xy: 

125 geom = _swap_xy(geom) 

126 return Shape(geom, crs) 

127 

128 

129def from_props(props: gws.Props) -> gws.Shape: 

130 """Creates a Shape from a properties object. 

131 

132 Args: 

133 props: A properties object. 

134 Returns: 

135 A Shape object. 

136 """ 

137 

138 crs = gws.gis.crs.get(props.get('crs')) 

139 if not crs: 

140 raise Error('missing or invalid crs') 

141 geom = _shapely_shape(props.get('geometry')) 

142 return Shape(geom, crs) 

143 

144 

145def from_dict(d: dict) -> gws.Shape: 

146 """Creates a Shape from a dictionary. 

147 

148 Args: 

149 d: A dictionary with the keys 'crs' and 'geometry'. 

150 Returns: 

151 A Shape object. 

152 """ 

153 

154 crs = gws.gis.crs.get(d.get('crs')) 

155 if not crs: 

156 raise Error('missing or invalid crs') 

157 geom = _shapely_shape(d.get('geometry')) 

158 return Shape(geom, crs) 

159 

160 

161def from_extent(extent: gws.Extent, crs: gws.Crs, always_xy=False) -> gws.Shape: 

162 """Creates a polygon Shape from an extent. 

163 

164 Args: 

165 extent: A hex-encoded WKB byte string. 

166 crs: A Crs object. 

167 always_xy: If ``True``, coordinates are assumed to be in the XY (lon/lat) order 

168 

169 Returns: 

170 A Shape object. 

171 """ 

172 

173 geom = shapely.geometry.box(*extent) 

174 if crs.isYX and not always_xy: 

175 geom = _swap_xy(geom) 

176 return Shape(geom, crs) 

177 

178 

179def from_bounds(bounds: gws.Bounds) -> gws.Shape: 

180 """Creates a polygon Shape from a Bounds object. 

181 

182 Args: 

183 bounds: A Bounds object. 

184 

185 Returns: 

186 A Shape object. 

187 """ 

188 

189 return Shape(shapely.geometry.box(*bounds.extent), bounds.crs) 

190 

191 

192def from_xy(x: float, y: float, crs: gws.Crs) -> gws.Shape: 

193 """Creates a point Shape from coordinates. 

194 

195 Args: 

196 x: X coordinate (lon/easting) 

197 y: Y coordinate (lat/northing) 

198 crs: A Crs object. 

199 

200 Returns: 

201 A Shape object. 

202 """ 

203 

204 return Shape(shapely.geometry.Point(x, y), crs) 

205 

206 

207def _swap_xy(geom): 

208 def f(x, y): 

209 return y, x 

210 

211 return shapely.ops.transform(f, geom) 

212 

213 

214_CIRCLE_RESOLUTION = 64 

215 

216 

217def _shapely_shape(d): 

218 if d.get('type').upper() == 'CIRCLE': 

219 geom = shapely.geometry.Point(d.get('center')) 

220 return geom.buffer( 

221 d.get('radius'), 

222 resolution=_CIRCLE_RESOLUTION, 

223 cap_style=shapely.geometry.CAP_STYLE.round, 

224 join_style=shapely.geometry.JOIN_STYLE.round) 

225 

226 return shapely.geometry.shape(d) 

227 

228 

229## 

230 

231 

232class Props(gws.Props): 

233 """Shape properties object.""" 

234 crs: str 

235 geometry: dict 

236 

237 

238## 

239 

240 

241class Shape(gws.Shape): 

242 geom: shapely.geometry.base.BaseGeometry 

243 

244 def __init__(self, geom, crs: gws.Crs): 

245 super().__init__() 

246 self.geom = geom 

247 self.crs = crs 

248 self.type = self.geom.geom_type.lower() 

249 self.x = getattr(self.geom, 'x', None) 

250 self.y = getattr(self.geom, 'y', None) 

251 

252 def __str__(self): 

253 return '{Geometry:' + self.geom.geom_type.upper() + '}' 

254 

255 def area(self): 

256 return getattr(self.geom, 'area', 0) 

257 

258 def bounds(self): 

259 return gws.Bounds(crs=self.crs, extent=self.geom.bounds) 

260 

261 def centroid(self): 

262 return Shape(self.geom.centroid, self.crs) 

263 

264 def to_wkb(self): 

265 return shapely.wkb.dumps(self.geom) 

266 

267 def to_wkb_hex(self): 

268 return shapely.wkb.dumps(self.geom, hex=True) 

269 

270 def to_ewkb(self): 

271 return shapely.wkb.dumps(self.geom, srid=self.crs.srid) 

272 

273 def to_ewkb_hex(self): 

274 return shapely.wkb.dumps(self.geom, srid=self.crs.srid, hex=True) 

275 

276 def to_wkt(self): 

277 return shapely.wkt.dumps(self.geom) 

278 

279 def to_ewkt(self): 

280 return f'SRID={self.crs.srid};' + self.to_wkt() 

281 

282 def to_geojson(self, always_xy=False): 

283 geom = self.geom 

284 if self.crs.isYX and not always_xy: 

285 geom = _swap_xy(geom) 

286 return shapely.geometry.mapping(geom) 

287 

288 def to_props(self): 

289 return gws.ShapeProps( 

290 crs=self.crs.epsg, 

291 geometry=shapely.geometry.mapping(self.geom)) 

292 

293 def is_empty(self): 

294 return self.geom.is_empty 

295 

296 def is_ring(self): 

297 return self.geom.is_ring 

298 

299 def is_simple(self): 

300 return self.geom.is_simple 

301 

302 def is_valid(self): 

303 return self.geom.is_valid 

304 

305 def equals(self, other): 

306 return self._binary_predicate(other, 'equals') 

307 

308 def contains(self, other): 

309 return self._binary_predicate(other, 'contains') 

310 

311 def covers(self, other): 

312 return self._binary_predicate(other, 'covers') 

313 

314 def covered_by(self, other): 

315 return self._binary_predicate(other, 'covered_by') 

316 

317 def crosses(self, other): 

318 return self._binary_predicate(other, 'crosses') 

319 

320 def disjoint(self, other): 

321 return self._binary_predicate(other, 'disjoint') 

322 

323 def intersects(self, other): 

324 return self._binary_predicate(other, 'intersects') 

325 

326 def overlaps(self, other): 

327 return self._binary_predicate(other, 'overlaps') 

328 

329 def touches(self, other): 

330 return self._binary_predicate(other, 'touches') 

331 

332 def within(self, other): 

333 return self._binary_predicate(other, 'within') 

334 

335 def _binary_predicate(self, other, op): 

336 s = other.transformed_to(self.crs) 

337 return getattr(self.geom, op)(getattr(s, 'geom')) 

338 

339 def union(self, others): 

340 if not others: 

341 return self 

342 

343 geoms = [self.geom] 

344 for s in others: 

345 s = s.transformed_to(self.crs) 

346 geoms.append(getattr(s, 'geom')) 

347 

348 geom = shapely.ops.unary_union(geoms) 

349 return Shape(geom, self.crs) 

350 

351 def intersection(self, *others): 

352 if not others: 

353 return self 

354 

355 geom = self.geom 

356 for s in others: 

357 s = s.transformed_to(self.crs) 

358 geom = geom.intersection(getattr(s, 'geom')) 

359 

360 return Shape(geom, self.crs) 

361 

362 def to_multi(self): 

363 if self.type == gws.GeometryType.point: 

364 return Shape(shapely.geometry.MultiPoint([self.geom]), self.crs) 

365 if self.type == gws.GeometryType.linestring: 

366 return Shape(shapely.geometry.MultiLineString([self.geom]), self.crs) 

367 if self.type == gws.GeometryType.polygon: 

368 return Shape(shapely.geometry.MultiPolygon([self.geom]), self.crs) 

369 return self 

370 

371 def to_type(self, new_type: gws.GeometryType): 

372 if new_type == self.type: 

373 return self 

374 if new_type == gws.GeometryType.geometry: 

375 return self 

376 if self.type == gws.GeometryType.point and new_type == gws.GeometryType.multipoint: 

377 return self.to_multi() 

378 if self.type == gws.GeometryType.linestring and new_type == gws.GeometryType.multilinestring: 

379 return self.to_multi() 

380 if self.type == gws.GeometryType.polygon and new_type == gws.GeometryType.multipolygon: 

381 return self.to_multi() 

382 raise Error(f'cannot convert {self.type!r} to {new_type!r}') 

383 

384 def tolerance_polygon(self, tolerance=None, quad_segs=None): 

385 is_poly = self.type in (gws.GeometryType.polygon, gws.GeometryType.multipolygon) 

386 

387 if not tolerance and is_poly: 

388 return self 

389 

390 # we need a polygon even if tolerance = 0 

391 tolerance = tolerance or _MIN_TOLERANCE_POLYGON 

392 quad_segs = quad_segs or _TOLERANCE_QUAD_SEGS 

393 

394 if is_poly: 

395 cs = shapely.geometry.CAP_STYLE.flat 

396 js = shapely.geometry.JOIN_STYLE.mitre 

397 else: 

398 cs = shapely.geometry.CAP_STYLE.round 

399 js = shapely.geometry.JOIN_STYLE.round 

400 

401 geom = self.geom.buffer(tolerance, quad_segs, cap_style=cs, join_style=js) 

402 return Shape(geom, self.crs) 

403 

404 def transformed_to(self, crs): 

405 if crs == self.crs: 

406 return self 

407 tr = self.crs.transformer(crs) 

408 dg = shapely.ops.transform(tr, self.geom) 

409 return Shape(dg, crs)