Coverage for gws-app/gws/plugin/alkis/data/indexer.py: 0%

670 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional, Iterable 

2 

3import re 

4from typing import Generic, TypeVar 

5 

6import shapely 

7import shapely.strtree 

8import shapely.wkb 

9 

10import gws 

11import gws.lib.osx 

12from gws.lib.cli import ProgressIndicator 

13import gws.plugin.postgres.provider 

14 

15 

16from . import types as dt 

17from . import index 

18from . import norbit6 

19 

20from .geo_info_dok import gid6 as gid 

21 

22 

23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False): 

24 if with_force: 

25 ix.drop() 

26 elif ix.status().complete: 

27 return 

28 

29 rdr = norbit6.Object(ix.db, schema=data_schema) 

30 rr = _Runner(ix, rdr, with_cache) 

31 rr.run() 

32 

33 

34## 

35 

36T = TypeVar("T") 

37 

38 

39class _ObjectDict(Generic[T]): 

40 def __init__(self, cls): 

41 self.d = {} 

42 self.cls = cls 

43 

44 def add(self, uid, recs) -> T: 

45 o = self.cls(uid=uid, recs=recs) 

46 o.isHistoric = all(r.isHistoric for r in recs) 

47 self.d[o.uid] = o 

48 return o 

49 

50 def get(self, uid, default=None) -> Optional[T]: 

51 return self.d.get(uid, default) 

52 

53 def get_many(self, uids) -> list[T]: 

54 res = {} 

55 

56 for uid in uids: 

57 if uid not in res: 

58 o = self.d.get(uid) 

59 if o: 

60 res[uid] = o 

61 

62 return list(res.values()) 

63 

64 def get_from_ptr(self, obj: dt.Entity, attr): 

65 uids = [] 

66 

67 for r in obj.recs: 

68 v = _pop(r, attr) 

69 if isinstance(v, list): 

70 uids.extend(v) 

71 elif isinstance(v, str): 

72 uids.append(v) 

73 

74 return self.get_many(uids) 

75 

76 def __iter__(self) -> Iterable[T]: 

77 yield from self.d.values() 

78 

79 def __len__(self): 

80 return len(self.d) 

81 

82 

83class _ObjectMap: 

84 

85 def __init__(self): 

86 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift) 

87 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt) 

88 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle) 

89 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck) 

90 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude) 

91 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage) 

92 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer) 

93 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part) 

94 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person) 

95 

96 self.placeAll: dict = {} 

97 self.placeIdx: dict = {} 

98 self.catalog: dict = {} 

99 

100 

101class _Indexer: 

102 CACHE_KEY: str = '' 

103 

104 def __init__(self, runner: '_Runner'): 

105 self.rr = runner 

106 self.ix: index.Object = runner.ix 

107 self.om = _ObjectMap() 

108 

109 def load_or_collect(self): 

110 if not self.load_cache(): 

111 self.collect() 

112 self.store_cache() 

113 

114 def load_cache(self): 

115 if not self.rr.withCache or not self.CACHE_KEY: 

116 return False 

117 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

118 if not gws.u.is_file(cpath): 

119 return False 

120 om = gws.u.unserialize_from_path(cpath) 

121 if not om: 

122 return False 

123 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}') 

124 self.om = om 

125 return True 

126 

127 def store_cache(self): 

128 if not self.rr.withCache or not self.CACHE_KEY: 

129 return 

130 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY 

131 gws.u.serialize_to_path(self.om, cpath) 

132 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}') 

133 

134 def collect(self): 

135 pass 

136 

137 def write_table(self, table_id, values): 

138 if self.ix.has_table(table_id): 

139 return 

140 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress: 

141 self.ix.create_table(table_id, values, progress) 

142 

143 def write(self): 

144 pass 

145 

146 

147class _PlaceIndexer(_Indexer): 

148 """Index places (Administration- und Verwaltungseinheiten). 

149 

150 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel 

151 """ 

152 

153 CACHE_KEY = index.TABLE_PLACE 

154 

155 empty1 = dt.EnumPair(code='0', text='') 

156 empty2 = dt.EnumPair(code='00', text='') 

157 

158 def add(self, kind, ax, key_obj, **kwargs): 

159 if ax.endet: 

160 return 

161 

162 code = self.code(kind, key_obj) 

163 value = dt.EnumPair(code, ax.bezeichnung) 

164 

165 p = dt.Place(**kwargs) 

166 

167 p.uid = kind + code 

168 p.kind = kind 

169 setattr(p, kind, value) 

170 

171 self.om.placeAll[p.uid] = p 

172 self.om.placeIdx[p.uid] = value 

173 

174 return value 

175 

176 def collect(self): 

177 self.om.placeAll = {} 

178 self.om.placeIdx = {} 

179 

180 for ax in self.rr.read_flat(gid.AX_Bundesland): 

181 self.add('land', ax, ax.schluessel) 

182 

183 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk): 

184 o = ax.schluessel 

185 self.add('regierungsbezirk', ax, o, land=self.get_land(o)) 

186 

187 for ax in self.rr.read_flat(gid.AX_KreisRegion): 

188 o = ax.schluessel 

189 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o)) 

190 

191 for ax in self.rr.read_flat(gid.AX_Gemeinde): 

192 o = ax.gemeindekennzeichen 

193 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o)) 

194 

195 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc) 

196 

197 for ax in self.rr.read_flat(gid.AX_Gemarkung): 

198 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung: 

199 continue 

200 o = ax.schluessel 

201 self.add('gemarkung', ax, o, land=self.get_land(o)) 

202 

203 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk): 

204 o = ax.schluessel 

205 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o)) 

206 

207 for ax in self.rr.read_flat(gid.AX_Dienststelle): 

208 o = ax.schluessel 

209 self.add('dienststelle', ax, o, land=self.get_land(o)) 

210 

211 def write(self): 

212 values = [] 

213 

214 for place in self.om.placeAll.values(): 

215 values.append(dict( 

216 uid=place.uid, 

217 data=index.serialize(place), 

218 )) 

219 

220 self.write_table(index.TABLE_PLACE, values) 

221 

222 def get_land(self, o): 

223 return self.get('land', o) or self.empty2 

224 

225 def get_regierungsbezirk(self, o): 

226 return self.get('regierungsbezirk', o) or self.empty1 

227 

228 def get_kreis(self, o): 

229 return self.get('kreis', o) or self.empty2 

230 

231 def get_gemeinde(self, o): 

232 return self.get('gemeinde', o) or self.empty1 

233 

234 def get_gemarkung(self, o): 

235 return self.get('gemarkung', o) or self.empty1 

236 

237 def get_buchungsblattbezirk(self, o): 

238 return self.get('buchungsblattbezirk', o) or self.empty1 

239 

240 def get_dienststelle(self, o): 

241 return self.get('dienststelle', o) or self.empty1 

242 

243 def get(self, kind, o): 

244 return self.om.placeIdx.get(kind + self.code(kind, o)) 

245 

246 def is_empty(self, p: dt.EnumPair): 

247 return p.code == '0' or p.code == '00' 

248 

249 CODES = { 

250 'land': lambda o: o.land, 

251 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'), 

252 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis, 

253 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde, 

254 'gemarkung': lambda o: o.land + o.gemarkungsnummer, 

255 'buchungsblattbezirk': lambda o: o.land + o.bezirk, 

256 'dienststelle': lambda o: o.land + o.stelle, 

257 } 

258 

259 def code(self, kind, o): 

260 return self.CODES[kind](o) 

261 

262 

263class _LageIndexer(_Indexer): 

264 CACHE_KEY = index.TABLE_LAGE 

265 

266 def collect(self): 

267 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag): 

268 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung 

269 

270 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer): 

271 for uid, axs in self.rr.read_grouped(cls): 

272 self.om.Lage.add(uid, [ 

273 _from_ax( 

274 dt.LageRecord, 

275 ax, 

276 strasse=self.strasse(ax), 

277 hausnummer=index.normalize_hausnummer(ax.hausnummer), 

278 ) 

279 for ax in axs 

280 ]) 

281 

282 # use the PTO (art=HNR) geometry for lage coordinates 

283 # PTO.dientZurDarstellungVon -> lage.uid 

284 

285 for pto in self.rr.read_flat(gid.AP_PTO): 

286 if pto.lebenszeitintervall.endet is not None: 

287 continue 

288 art = _pop(pto, 'art') or '' 

289 if art.upper() != 'HNR': 

290 continue 

291 

292 uids = _pop(pto, 'dientZurDarstellungVon') 

293 if not uids or not isinstance(uids, list): 

294 continue 

295 

296 geom = _geom_of(pto) 

297 if not geom: 

298 continue 

299 

300 x = geom.centroid.x 

301 y = geom.centroid.y 

302 for la in self.om.Lage.get_many(uids): 

303 la.x = x 

304 la.y = y 

305 

306 # read related Gebaeude records 

307 

308 atts = _attributes(gid.METADATA['AX_Gebaeude'], dt.Gebaeude.PROP_KEYS) 

309 

310 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude): 

311 self.om.Gebaeude.add(uid, [ 

312 _from_ax( 

313 dt.GebaeudeRecord, 

314 ax, 

315 name=', '.join(ax.name) if ax.name else None, 

316 amtlicheFlaeche=ax.grundflaeche or 0, 

317 props=self.rr.props_from(ax, atts), 

318 _zeigtAuf=ax.zeigtAuf, 

319 ) 

320 for ax in axs 

321 ]) 

322 

323 for ge in self.om.Gebaeude: 

324 for r in ge.recs: 

325 geom = _geom_of(r) 

326 r.geomFlaeche = round(geom.area, 2) if geom else 0 

327 

328 # omit Gebaeude geometries for now 

329 for ge in self.om.Gebaeude: 

330 for r in ge.recs: 

331 _pop(r, 'geom') 

332 

333 for la in self.om.Lage: 

334 la.gebaeudeList = [] 

335 

336 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer 

337 for ge in self.om.Gebaeude: 

338 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'): 

339 la.gebaeudeList.append(ge) 

340 

341 def strasse(self, ax): 

342 if isinstance(ax.lagebezeichnung, str): 

343 return ax.lagebezeichnung 

344 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '') 

345 

346 def lage_key(self, r): 

347 return _comma([ 

348 getattr(r, 'land'), 

349 getattr(r, 'regierungsbezirk'), 

350 getattr(r, 'kreis'), 

351 getattr(r, 'gemeinde'), 

352 getattr(r, 'lage'), 

353 ]) 

354 

355 def write(self): 

356 values = [] 

357 

358 for la in self.om.Lage: 

359 values.append(dict( 

360 uid=la.uid, 

361 rc=len(la.recs), 

362 data=index.serialize(la), 

363 )) 

364 

365 self.write_table(index.TABLE_LAGE, values) 

366 

367 

368class _BuchungIndexer(_Indexer): 

369 CACHE_KEY = index.TABLE_BUCHUNGSBLATT 

370 

371 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {} 

372 

373 def collect(self): 

374 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift): 

375 self.om.Anschrift.add(uid, [ 

376 _from_ax( 

377 dt.AnschriftRecord, 

378 ax, 

379 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post, 

380 plz=ax.postleitzahlPostzustellung, 

381 telefon=ax.telefon[0] if ax.telefon else None 

382 ) 

383 for ax in axs 

384 ]) 

385 

386 for uid, axs in self.rr.read_grouped(gid.AX_Person): 

387 self.om.Person.add(uid, [ 

388 _from_ax( 

389 dt.PersonRecord, 

390 ax, 

391 anrede=ax.anrede.text if ax.anrede else None, 

392 _hat=ax.hat, 

393 ) 

394 for ax in axs 

395 ]) 

396 

397 # AX_Person.hat -> [AX_Anschrift] 

398 for pe in self.om.Person: 

399 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat') 

400 

401 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer): 

402 self.om.Namensnummer.add(uid, [ 

403 _from_ax( 

404 dt.NamensnummerRecord, 

405 ax, 

406 anteil=_anteil(ax), 

407 _benennt=ax.benennt, 

408 _istBestandteilVon=ax.istBestandteilVon 

409 ) 

410 for ax in axs 

411 ]) 

412 

413 # AX_Namensnummer.benennt -> AX_Person 

414 for nn in self.om.Namensnummer: 

415 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421 

416 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt') 

417 

418 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle): 

419 self.om.Buchungsstelle.add(uid, [ 

420 _from_ax( 

421 dt.BuchungsstelleRecord, 

422 ax, 

423 anteil=_anteil(ax), 

424 _an=ax.an, 

425 _zu=ax.zu, 

426 _istBestandteilVon=ax.istBestandteilVon, 

427 ) 

428 for ax in axs 

429 ]) 

430 

431 for bs in self.om.Buchungsstelle: 

432 bs.laufendeNummer = bs.recs[-1].laufendeNummer 

433 bs.fsUids = [] 

434 bs.flurstueckskennzeichenList = [] 

435 

436 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt): 

437 self.om.Buchungsblatt.add(uid, [ 

438 _from_ax( 

439 dt.BuchungsblattRecord, 

440 ax, 

441 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk), 

442 ) 

443 for ax in axs 

444 ]) 

445 

446 for bb in self.om.Buchungsblatt: 

447 bb.buchungsstelleList = [] 

448 bb.namensnummerList = [] 

449 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen 

450 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb 

451 

452 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt 

453 for bs in self.om.Buchungsstelle: 

454 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon') 

455 bs.buchungsblattUids = [bb.uid for bb in bb_list] 

456 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

457 for bb in bb_list: 

458 bb.buchungsstelleList.append(bs) 

459 

460 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt 

461 for nn in self.om.Namensnummer: 

462 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon') 

463 nn.buchungsblattUids = [bb.uid for bb in bb_list] 

464 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list] 

465 for bb in bb_list: 

466 bb.namensnummerList.append(nn) 

467 

468 for bb in self.om.Buchungsblatt: 

469 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle) 

470 bb.namensnummerList.sort(key=_sortkey_namensnummer) 

471 

472 # AX_Buchungsstelle.an -> [AX_Buchungsstelle] 

473 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle] 

474 # see Erläuterungen zu ALKIS Version 6, page 116-119 

475 

476 for bs in self.om.Buchungsstelle: 

477 bs.childUids = [] 

478 bs.parentUids = [] 

479 bs.parentkennzeichenList = [] 

480 

481 for bs in self.om.Buchungsstelle: 

482 parent_uids = set() 

483 parent_knz = set() 

484 

485 for r in bs.recs: 

486 parent_uids.update(_pop(r, '_an')) 

487 parent_uids.update(_pop(r, '_zu')) 

488 

489 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids): 

490 parent_bs.childUids.append(bs.uid) 

491 bs.parentUids.append(parent_bs.uid) 

492 for bb_knz in parent_bs.buchungsblattkennzeichenList: 

493 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer) 

494 

495 bs.parentkennzeichenList = sorted(parent_knz) 

496 

497 def write(self): 

498 values = [] 

499 

500 for bb in self.om.Buchungsblatt: 

501 values.append(dict( 

502 uid=bb.uid, 

503 rc=len(bb.recs), 

504 data=index.serialize(bb), 

505 )) 

506 

507 self.write_table(index.TABLE_BUCHUNGSBLATT, values) 

508 

509 

510class _PartIndexer(_Indexer): 

511 CACHE_KEY = index.TABLE_PART 

512 MIN_PART_AREA = 1 

513 

514 parts: list[dt.Part] = [] 

515 

516 fs_list = [] 

517 fs_geom = [] 

518 

519 stree: shapely.strtree.STRtree 

520 

521 def collect(self): 

522 

523 for kind in dt.Part.KIND: 

524 self.collect_kind(kind) 

525 

526 for fs in self.rr.fsdata.om.Flurstueck: 

527 self.fs_list.append(fs) 

528 # NB take only the most recent fs geometry into account 

529 self.fs_geom.append(_geom_of(fs.recs[-1])) 

530 

531 self.stree = shapely.strtree.STRtree(self.fs_geom) 

532 

533 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress: 

534 for pa in self.om.Part: 

535 self.compute_intersections(pa) 

536 progress.update(1) 

537 

538 self.parts.sort(key=_sortkey_part) 

539 

540 for pa in self.parts: 

541 pa.isHistoric = all(r.isHistoric for r in pa.recs) 

542 

543 def collect_kind(self, kind): 

544 _, key = dt.Part.KIND[kind] 

545 classes = [ 

546 getattr(gid, meta['name']) 

547 for meta in gid.METADATA.values() 

548 if ( 

549 meta['kind'] == 'object' 

550 and meta['geom'] 

551 and re.search(key + r'/\w+/', meta['key']) 

552 ) 

553 ] 

554 

555 for cls in classes: 

556 self.collect_class(kind, cls) 

557 

558 def collect_class(self, kind, cls): 

559 meta = gid.METADATA[cls.__name__] 

560 atts = _attributes(meta, dt.Part.PROP_KEYS) 

561 

562 for uid, axs in self.rr.read_grouped(cls): 

563 pa = self.om.Part.add(uid, [ 

564 _from_ax( 

565 dt.PartRecord, 

566 ax, 

567 props=self.rr.props_from(ax, atts), 

568 ) 

569 for ax in axs 

570 ]) 

571 pa.kind = kind 

572 pa.name = dt.EnumPair(meta['uid'], meta['title']) 

573 

574 def compute_intersections(self, pa: dt.Part): 

575 parts_map = {} 

576 

577 for r in pa.recs: 

578 geom = _geom_of(r) 

579 if not geom: 

580 continue 

581 

582 for i in self.stree.query(geom): 

583 part_geom = shapely.intersection(self.fs_geom[i], geom) 

584 part_area = round(part_geom.area, 2) 

585 if part_area < self.MIN_PART_AREA: 

586 continue 

587 

588 fs = self.fs_list[i] 

589 

590 part = parts_map.setdefault(fs.uid, dt.Part( 

591 uid=pa.uid, 

592 recs=[], 

593 kind=pa.kind, 

594 name=pa.name, 

595 fs=fs.uid, 

596 )) 

597 

598 # computed area corrected with respect to FS's "amtlicheFlaeche" 

599 part_area_corrected = round( 

600 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche), 

601 2) 

602 

603 part.recs.append(dt.PartRecord( 

604 uid=r.uid, 

605 beginnt=r.beginnt, 

606 endet=r.endet, 

607 anlass=r.anlass, 

608 props=r.props, 

609 geomFlaeche=part_area, 

610 amtlicheFlaeche=part_area_corrected, 

611 isHistoric=r.endet is not None, 

612 )) 

613 

614 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True) 

615 part.geomFlaeche = part_area 

616 part.amtlicheFlaeche = part_area_corrected 

617 

618 self.parts.extend(parts_map.values()) 

619 

620 def write(self): 

621 values = [] 

622 

623 for n, pa in enumerate(self.parts, 1): 

624 geom = _pop(pa, 'geom') 

625 data = index.serialize(pa) 

626 pa.geom = geom 

627 values.append(dict( 

628 n=n, 

629 fs=pa.fs, 

630 uid=pa.uid, 

631 beginnt=pa.recs[-1].beginnt, 

632 endet=pa.recs[-1].endet, 

633 kind=pa.kind, 

634 name=pa.name.text, 

635 parthistoric=pa.isHistoric, 

636 data=data, 

637 geom=geom, 

638 )) 

639 

640 self.write_table(index.TABLE_PART, values) 

641 

642 

643class _FsDataIndexer(_Indexer): 

644 CACHE_KEY = index.TABLE_FLURSTUECK 

645 

646 def __init__(self, runner: '_Runner'): 

647 super().__init__(runner) 

648 self.counts = { 

649 'ok': 0, 

650 'no_geometry': 0, 

651 'excluded': 0, 

652 } 

653 

654 def collect(self): 

655 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck): 

656 recs = gws.u.compact(self.record(ax) for ax in axs) 

657 if recs: 

658 self.om.Flurstueck.add(uid, recs) 

659 

660 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck): 

661 recs = gws.u.compact(self.record(ax) for ax in axs) 

662 if not recs: 

663 continue 

664 # For a historic FS, 'beginnt' is basically when the history beginnt 

665 # (see comments for AX_HistorischesFlurstueck in gid6). 

666 # we set F.endet=F.beginnt to designate this one as 'historic' 

667 for r in recs: 

668 r.endet = r.beginnt 

669 r.isHistoric = True 

670 self.om.Flurstueck.add(uid, recs) 

671 

672 for fs in self.om.Flurstueck: 

673 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen 

674 fs.fsnummer = index.make_fsnummer(fs.recs[-1]) 

675 fs.x = fs.recs[-1].x 

676 fs.y = fs.recs[-1].y 

677 self.process_lage(fs) 

678 self.process_gebaeude(fs) 

679 self.process_buchung(fs) 

680 

681 # check the 'nachfolgerFlurstueckskennzeichen' array 

682 # and mark each referenced FS as a "vorgaenger" FS 

683 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array 

684 

685 k_to_fs = { 

686 fs.flurstueckskennzeichen: fs 

687 for fs in self.om.Flurstueck 

688 } 

689 for fs in self.om.Flurstueck: 

690 nf_keys = fs.recs[-1].nachfolgerFlurstueckskennzeichen 

691 if not nf_keys: 

692 continue 

693 for nf_key in nf_keys: 

694 nf_fs = k_to_fs.get(nf_key) 

695 if not nf_fs: 

696 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_key!r}') 

697 continue 

698 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen: 

699 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = [] 

700 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen) 

701 

702 

703 def record(self, ax): 

704 r: dt.FlurstueckRecord = _from_ax( 

705 dt.FlurstueckRecord, 

706 ax, 

707 amtlicheFlaeche=ax.amtlicheFlaeche or 0, 

708 flurnummer=_str(ax.flurnummer), 

709 zaehler=_str(ax.flurstuecksnummer.zaehler), 

710 nenner=_str(ax.flurstuecksnummer.nenner), 

711 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])], 

712 

713 _weistAuf=ax.weistAuf, 

714 _zeigtAuf=ax.zeigtAuf, 

715 _istGebucht=ax.istGebucht, 

716 _buchung=ax.buchung, 

717 ) 

718 

719 # basic data 

720 

721 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung) 

722 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit) 

723 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit) 

724 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit) 

725 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit) 

726 

727 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde): 

728 # exclude Flurstücke that refer to Gemeinde/Gemarkung 

729 # which do not exist in the reference AX tables 

730 self.counts['excluded'] += 1 

731 return None 

732 

733 # geometry 

734 

735 geom = _geom_of(r) 

736 if not geom: 

737 self.counts['no_geometry'] += 1 

738 return None 

739 

740 r.geomFlaeche = round(geom.area, 2) 

741 r.x = round(geom.centroid.x, 2) 

742 r.y = round(geom.centroid.y, 2) 

743 

744 self.counts['ok'] += 1 

745 return r 

746 

747 def process_lage(self, fs: dt.Flurstueck): 

748 fs.lageList = [] 

749 

750 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer 

751 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer 

752 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf')) 

753 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf')) 

754 

755 def process_gebaeude(self, fs: dt.Flurstueck): 

756 ge_map = {} 

757 

758 for la in fs.lageList: 

759 for ge in la.gebaeudeList: 

760 ge_map[ge.uid] = ge 

761 

762 fs.gebaeudeList = list(ge_map.values()) 

763 fs.gebaeudeList.sort(key=_sortkey_gebaeude) 

764 

765 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

766 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet) 

767 

768 def process_buchung(self, fs: dt.Flurstueck): 

769 bs_historic_map = {} 

770 bs_seen = set() 

771 buchung_map = {} 

772 

773 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations) 

774 # then group Buchungsstellen by their Buchungsblatt 

775 # and create Buchung objects for a FS 

776 

777 for r in fs.recs: 

778 hist_buchung = _pop(r, '_buchung') 

779 if hist_buchung: 

780 bs_list = self.historic_buchungsstelle_list(r, hist_buchung) 

781 else: 

782 bs_list = self.buchungsstelle_list(r) 

783 

784 for bs in bs_list: 

785 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself, 

786 # so we have to track its state separately by wrapping it in a BuchungsstelleReference 

787 # a BuchungsstelleReference is historic if its Flurstück is 

788 bs_historic_map[bs.uid] = r.isHistoric 

789 

790 if bs.uid in bs_seen: 

791 continue 

792 bs_seen.add(bs.uid) 

793 

794 # populate Flurstück references in a Buchungsstelle 

795 if fs.uid not in bs.fsUids: 

796 bs.fsUids.append(fs.uid) 

797 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen) 

798 

799 # create Buchung records by grouping Buchungsstellen 

800 for bb_uid in bs.buchungsblattUids: 

801 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid)) 

802 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs)) 

803 

804 fs.buchungList = list(buchung_map.values()) 

805 

806 for bu in fs.buchungList: 

807 for ref in bu.recs: 

808 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid] 

809 bu.isHistoric = all(ref.isHistoric for ref in bu.recs) 

810 

811 return fs 

812 

813 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung): 

814 # an AX_HistorischesFlurstueck with a special 'buchung' reference 

815 

816 bs_list = [] 

817 

818 for bu in hist_buchung: 

819 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen) 

820 if not bb: 

821 continue 

822 # create a fake historic Buchungstelle 

823 bs_list.append(dt.Buchungsstelle( 

824 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle, 

825 recs=[ 

826 dt.BuchungsstelleRecord( 

827 endet=r.endet, 

828 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

829 isHistoric=True, 

830 ) 

831 ], 

832 buchungsblattUids=[bb.uid], 

833 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen], 

834 parentUids=[], 

835 childUids=[], 

836 fsUids=[], 

837 parentkennzeichenList=[], 

838 flurstueckskennzeichenList=[], 

839 laufendeNummer=bu.laufendeNummerDerBuchungsstelle, 

840 isHistoric=True, 

841 )) 

842 

843 return bs_list 

844 

845 def buchungsstelle_list(self, r: dt.FlurstueckRecord): 

846 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle 

847 

848 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht')) 

849 if not this_bs: 

850 return [] 

851 

852 bs_list = [] 

853 

854 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B). 

855 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen 

856 # (these references are populated in _BuchungIndexer above). 

857 # Our task here, given F.istGebucht -> B, collect B's parents and children 

858 # These are Buchungsstellen that directly or indirectly mention the current Flurstück. 

859 

860 queue: list[dt.Buchungsstelle] = [this_bs] 

861 while queue: 

862 bs = queue.pop(0) 

863 bs_list.insert(0, bs) 

864 for uid in bs.parentUids: 

865 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid)) 

866 

867 # remove this_bs 

868 bs_list.pop() 

869 

870 queue: list[dt.Buchungsstelle] = [this_bs] 

871 while queue: 

872 bs = queue.pop(0) 

873 bs_list.append(bs) 

874 # sort related (child) Buchungsstellen by their BB-Kennzeichen 

875 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids) 

876 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt) 

877 queue.extend(child_bs_list) 

878 

879 # if len(bs_list) > 1: 

880 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}') 

881 

882 return bs_list 

883 

884 def write(self): 

885 values = [] 

886 

887 for fs in self.om.Flurstueck: 

888 geoms = [_pop(r, 'geom') for r in fs.recs] 

889 data = index.serialize(fs) 

890 for r, g in zip(fs.recs, geoms): 

891 r.geom = g 

892 

893 values.append(dict( 

894 uid=fs.uid, 

895 rc=len(fs.recs), 

896 fshistoric=fs.isHistoric, 

897 data=data, 

898 geom=geoms[-1], 

899 )) 

900 

901 self.write_table(index.TABLE_FLURSTUECK, values) 

902 

903 

904class _FsIndexIndexer(_Indexer): 

905 entries = { 

906 index.TABLE_INDEXFLURSTUECK: [], 

907 index.TABLE_INDEXLAGE: [], 

908 index.TABLE_INDEXBUCHUNGSBLATT: [], 

909 index.TABLE_INDEXPERSON: [], 

910 index.TABLE_INDEXGEOM: [], 

911 } 

912 

913 def collect(self): 

914 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress: 

915 for fs in self.rr.fsdata.om.Flurstueck: 

916 for r in fs.recs: 

917 self.add(fs, r) 

918 progress.update(1) 

919 

920 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord): 

921 base = dict( 

922 fs=r.uid, 

923 fshistoric=r.isHistoric, 

924 ) 

925 

926 places = dict( 

927 land=r.land.text, 

928 land_t=index.text_key(r.land.text), 

929 landcode=r.land.code, 

930 

931 regierungsbezirk=r.regierungsbezirk.text, 

932 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text), 

933 regierungsbezirkcode=r.regierungsbezirk.code, 

934 

935 kreis=r.kreis.text, 

936 kreis_t=index.text_key(r.kreis.text), 

937 kreiscode=r.kreis.code, 

938 

939 gemeinde=r.gemeinde.text, 

940 gemeinde_t=index.text_key(r.gemeinde.text), 

941 gemeindecode=r.gemeinde.code, 

942 

943 gemarkung=r.gemarkung.text, 

944 gemarkung_t=index.text_key(r.gemarkung.text), 

945 gemarkungcode=r.gemarkung.code, 

946 

947 ) 

948 

949 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict( 

950 **base, 

951 **places, 

952 

953 amtlicheflaeche=r.amtlicheFlaeche, 

954 geomflaeche=r.geomFlaeche, 

955 

956 flurnummer=r.flurnummer, 

957 zaehler=r.zaehler, 

958 nenner=r.nenner, 

959 flurstuecksfolge=r.flurstuecksfolge, 

960 flurstueckskennzeichen=r.flurstueckskennzeichen, 

961 

962 x=r.x, 

963 y=r.y, 

964 )) 

965 

966 self.entries[index.TABLE_INDEXGEOM].append(dict( 

967 **base, 

968 geomflaeche=r.geomFlaeche, 

969 x=r.x, 

970 y=r.y, 

971 geom=r.geom, 

972 )) 

973 

974 for la in fs.lageList: 

975 for la_r in la.recs: 

976 self.entries[index.TABLE_INDEXLAGE].append(dict( 

977 **base, 

978 **places, 

979 lageuid=la_r.uid, 

980 lagehistoric=la_r.isHistoric, 

981 strasse=la_r.strasse, 

982 strasse_t=index.strasse_key(la_r.strasse), 

983 hausnummer=la_r.hausnummer, 

984 x=la.x or r.x, 

985 y=la.y or r.y, 

986 )) 

987 

988 for bu in fs.buchungList: 

989 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid) 

990 

991 for bb_r in bb.recs: 

992 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict( 

993 **base, 

994 buchungsblattuid=bb_r.uid, 

995 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen, 

996 buchungsblatthistoric=bu.isHistoric, 

997 )) 

998 

999 pe_uids = set() 

1000 

1001 for nn in bb.namensnummerList: 

1002 for pe in nn.personList: 

1003 if pe.uid in pe_uids: 

1004 continue 

1005 pe_uids.add(pe.uid) 

1006 for pe_r in pe.recs: 

1007 self.entries[index.TABLE_INDEXPERSON].append(dict( 

1008 **base, 

1009 personuid=pe_r.uid, 

1010 personhistoric=pe_r.isHistoric, 

1011 name=pe_r.nachnameOderFirma, 

1012 name_t=index.text_key(pe_r.nachnameOderFirma), 

1013 vorname=pe_r.vorname, 

1014 vorname_t=index.text_key(pe_r.vorname), 

1015 )) 

1016 

1017 def write(self): 

1018 for table_id, values in self.entries.items(): 

1019 if not self.ix.has_table(table_id): 

1020 for n, v in enumerate(values, 1): 

1021 v['n'] = n 

1022 self.write_table(table_id, values) 

1023 

1024 

1025class _Runner: 

1026 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False): 

1027 self.ix: index.Object = ix 

1028 self.reader: dt.Reader = reader 

1029 

1030 self.withCache = with_cache 

1031 self.cacheDir = gws.c.CACHE_DIR + '/alkis' 

1032 if self.withCache: 

1033 gws.u.ensure_dir(self.cacheDir) 

1034 

1035 self.place = _PlaceIndexer(self) 

1036 self.lage = _LageIndexer(self) 

1037 self.buchung = _BuchungIndexer(self) 

1038 self.part = _PartIndexer(self) 

1039 self.fsdata = _FsDataIndexer(self) 

1040 self.fsindex = _FsIndexIndexer(self) 

1041 

1042 self.initMemory = gws.lib.osx.process_rss_size() 

1043 

1044 def run(self): 

1045 with self.ix.db.connect() as conn: 

1046 with ProgressIndicator(f'ALKIS: indexing'): 

1047 self.place.load_or_collect() 

1048 self.memory_info() 

1049 

1050 self.buchung.load_or_collect() 

1051 self.memory_info() 

1052 

1053 self.lage.load_or_collect() 

1054 self.memory_info() 

1055 

1056 self.fsdata.load_or_collect() 

1057 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}') 

1058 self.memory_info() 

1059 

1060 self.part.load_or_collect() 

1061 self.memory_info() 

1062 

1063 self.fsindex.collect() 

1064 self.memory_info() 

1065 

1066 self.place.write() 

1067 self.buchung.write() 

1068 self.lage.write() 

1069 self.fsdata.write() 

1070 self.part.write() 

1071 self.fsindex.write() 

1072 

1073 def memory_info(self): 

1074 v = gws.lib.osx.process_rss_size() - self.initMemory 

1075 if v > 0: 

1076 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2) 

1077 

1078 def read_flat(self, cls): 

1079 cpath = self.cacheDir + '/flat_' + cls.__name__ 

1080 if self.withCache and gws.u.is_file(cpath): 

1081 return gws.u.unserialize_from_path(cpath) 

1082 

1083 rs = self._read_flat(cls) 

1084 if self.withCache: 

1085 gws.u.serialize_to_path(rs, cpath) 

1086 

1087 return rs 

1088 

1089 def _read_flat(self, cls): 

1090 cnt = self.reader.count(cls) 

1091 if cnt <= 0: 

1092 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1093 return [] 

1094 

1095 rs = [] 

1096 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1097 for ax in self.reader.read_all(cls): 

1098 rs.append(ax) 

1099 progress.update(1) 

1100 return rs 

1101 

1102 def read_grouped(self, cls): 

1103 cpath = self.cacheDir + '/grouped_' + cls.__name__ 

1104 if self.withCache and gws.u.is_file(cpath): 

1105 return gws.u.unserialize_from_path(cpath) 

1106 

1107 rs = self._read_grouped(cls) 

1108 if self.withCache: 

1109 gws.u.serialize_to_path(rs, cpath) 

1110 

1111 return rs 

1112 

1113 def _read_grouped(self, cls): 

1114 cnt = self.reader.count(cls) 

1115 if cnt <= 0: 

1116 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table') 

1117 return [] 

1118 

1119 groups = {} 

1120 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress: 

1121 for ax in self.reader.read_all(cls): 

1122 groups.setdefault(ax.identifikator, []).append(ax) 

1123 progress.update(1) 

1124 for g in groups.values(): 

1125 g.sort(key=_sortkey_lebenszeitintervall) 

1126 

1127 return list(groups.items()) 

1128 

1129 def props_from(self, ax, atts): 

1130 props = [] 

1131 

1132 for a in atts: 

1133 v = getattr(ax, a['name'], None) 

1134 if isinstance(v, gid.Object): 

1135 v = self.object_prop_value(v) 

1136 if not gws.u.is_empty(v): 

1137 props.append([a['title'], v]) 

1138 

1139 return props 

1140 

1141 def object_prop_value(self, o): 

1142 return None 

1143 # @TODO handle properties which are objects 

1144 # gid.AX_Bundesland_Schluessel 

1145 # gid.AX_Dienststelle_Schluessel 

1146 # gid.AX_Gemarkung_Schluessel 

1147 # gid.AX_Gemeindekennzeichen 

1148 # gid.AX_Kreis_Schluessel 

1149 # gid.AX_Regierungsbezirk_Schluessel 

1150 # gid.AX_KennzifferGrabloch 

1151 # gid.AX_Lagebezeichnung 

1152 # gid.AX_Tagesabschnitt 

1153 # gid.AX_Verwaltungsgemeinschaft_Schluessel 

1154 

1155 

1156def _from_ax(cls, ax, **kwargs): 

1157 d = {} 

1158 

1159 if ax: 

1160 for k in cls.__annotations__: 

1161 v = getattr(ax, k, None) 

1162 if v: 

1163 d[k] = v 

1164 

1165 d['uid'] = ax.identifikator 

1166 d['beginnt'] = ax.lebenszeitintervall.beginnt 

1167 d['endet'] = ax.lebenszeitintervall.endet 

1168 d['isHistoric'] = d['endet'] is not None 

1169 if ax.anlass and ax.anlass[0].code != '000000': 

1170 d['anlass'] = ax.anlass[0] 

1171 if ax.geom: 

1172 d['geom'] = ax.geom 

1173 

1174 d.update(kwargs) 

1175 return cls(**d) 

1176 

1177 

1178def _anteil(ax): 

1179 try: 

1180 z = float(ax.anteil.zaehler) 

1181 z = str(int(z) if z.is_integer() else z) 

1182 n = float(ax.anteil.nenner) 

1183 n = str(int(n) if n.is_integer() else n) 

1184 return z + '/' + n 

1185 except (AttributeError, ValueError, TypeError): 

1186 pass 

1187 

1188 

1189def _attributes(meta, keys): 

1190 return sorted( 

1191 [a for a in meta['attributes'] if a['name'] in keys], 

1192 key=lambda a: a['title'] 

1193 ) 

1194 

1195 

1196def _geom_of(o): 

1197 if not o.geom: 

1198 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry') 

1199 return 

1200 return shapely.wkb.loads(o.geom, hex=True) 

1201 

1202 

1203def _pop(obj, attr): 

1204 v = getattr(obj, attr, None) 

1205 try: 

1206 delattr(obj, attr) 

1207 except AttributeError: 

1208 pass 

1209 return v 

1210 

1211 

1212def _sortkey_beginnt(o): 

1213 return o.beginnt 

1214 

1215 

1216def _sortkey_lebenszeitintervall(o): 

1217 return o.lebenszeitintervall.beginnt 

1218 

1219 

1220def _sortkey_namensnummer(nn: dt.Namensnummer): 

1221 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt 

1222 

1223 

1224def _sortkey_buchungsstelle(bs: dt.Buchungsstelle): 

1225 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt 

1226 

1227 

1228def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle): 

1229 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt 

1230 

1231 

1232def _sortkey_part(pa: dt.Part): 

1233 return pa.name.text, -pa.geomFlaeche 

1234 

1235 

1236def _sortkey_gebaeude(ge: dt.Gebaeude): 

1237 # sort Gebaeude by area (big->small) 

1238 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche 

1239 

1240 

1241def _natkey(v): 

1242 if not v: 

1243 return [] 

1244 return [ 

1245 '{:080d}'.format(int(digits)) if digits else chars.lower() 

1246 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip()) 

1247 ] 

1248 

1249 

1250def _comma(a): 

1251 return ','.join(str(s) if s is not None else '' for s in a) 

1252 

1253 

1254def _str(x): 

1255 return None if x is None else str(x)