Coverage for gws-app/gws/plugin/alkis/data/indexer.py: 0%
670 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from typing import Optional, Iterable
3import re
4from typing import Generic, TypeVar
6import shapely
7import shapely.strtree
8import shapely.wkb
10import gws
11import gws.lib.osx
12from gws.lib.cli import ProgressIndicator
13import gws.plugin.postgres.provider
16from . import types as dt
17from . import index
18from . import norbit6
20from .geo_info_dok import gid6 as gid
23def run(ix: index.Object, data_schema: str, with_force=False, with_cache=False):
24 if with_force:
25 ix.drop()
26 elif ix.status().complete:
27 return
29 rdr = norbit6.Object(ix.db, schema=data_schema)
30 rr = _Runner(ix, rdr, with_cache)
31 rr.run()
34##
36T = TypeVar("T")
39class _ObjectDict(Generic[T]):
40 def __init__(self, cls):
41 self.d = {}
42 self.cls = cls
44 def add(self, uid, recs) -> T:
45 o = self.cls(uid=uid, recs=recs)
46 o.isHistoric = all(r.isHistoric for r in recs)
47 self.d[o.uid] = o
48 return o
50 def get(self, uid, default=None) -> Optional[T]:
51 return self.d.get(uid, default)
53 def get_many(self, uids) -> list[T]:
54 res = {}
56 for uid in uids:
57 if uid not in res:
58 o = self.d.get(uid)
59 if o:
60 res[uid] = o
62 return list(res.values())
64 def get_from_ptr(self, obj: dt.Entity, attr):
65 uids = []
67 for r in obj.recs:
68 v = _pop(r, attr)
69 if isinstance(v, list):
70 uids.extend(v)
71 elif isinstance(v, str):
72 uids.append(v)
74 return self.get_many(uids)
76 def __iter__(self) -> Iterable[T]:
77 yield from self.d.values()
79 def __len__(self):
80 return len(self.d)
83class _ObjectMap:
85 def __init__(self):
86 self.Anschrift: _ObjectDict[dt.Anschrift] = _ObjectDict(dt.Anschrift)
87 self.Buchungsblatt: _ObjectDict[dt.Buchungsblatt] = _ObjectDict(dt.Buchungsblatt)
88 self.Buchungsstelle: _ObjectDict[dt.Buchungsstelle] = _ObjectDict(dt.Buchungsstelle)
89 self.Flurstueck: _ObjectDict[dt.Flurstueck] = _ObjectDict(dt.Flurstueck)
90 self.Gebaeude: _ObjectDict[dt.Gebaeude] = _ObjectDict(dt.Gebaeude)
91 self.Lage: _ObjectDict[dt.Lage] = _ObjectDict(dt.Lage)
92 self.Namensnummer: _ObjectDict[dt.Namensnummer] = _ObjectDict(dt.Namensnummer)
93 self.Part: _ObjectDict[dt.Part] = _ObjectDict(dt.Part)
94 self.Person: _ObjectDict[dt.Person] = _ObjectDict(dt.Person)
96 self.placeAll: dict = {}
97 self.placeIdx: dict = {}
98 self.catalog: dict = {}
101class _Indexer:
102 CACHE_KEY: str = ''
104 def __init__(self, runner: '_Runner'):
105 self.rr = runner
106 self.ix: index.Object = runner.ix
107 self.om = _ObjectMap()
109 def load_or_collect(self):
110 if not self.load_cache():
111 self.collect()
112 self.store_cache()
114 def load_cache(self):
115 if not self.rr.withCache or not self.CACHE_KEY:
116 return False
117 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
118 if not gws.u.is_file(cpath):
119 return False
120 om = gws.u.unserialize_from_path(cpath)
121 if not om:
122 return False
123 gws.log.info(f'ALKIS: use cache {self.CACHE_KEY!r}')
124 self.om = om
125 return True
127 def store_cache(self):
128 if not self.rr.withCache or not self.CACHE_KEY:
129 return
130 cpath = self.rr.cacheDir + '/' + self.CACHE_KEY
131 gws.u.serialize_to_path(self.om, cpath)
132 gws.log.info(f'ALKIS: store cache {self.CACHE_KEY!r}')
134 def collect(self):
135 pass
137 def write_table(self, table_id, values):
138 if self.ix.has_table(table_id):
139 return
140 with ProgressIndicator(f'ALKIS: write {table_id!r}', len(values)) as progress:
141 self.ix.create_table(table_id, values, progress)
143 def write(self):
144 pass
147class _PlaceIndexer(_Indexer):
148 """Index places (Administration- und Verwaltungseinheiten).
150 References: https://de.wikipedia.org/wiki/Amtlicher_Gemeindeschl%C3%BCssel
151 """
153 CACHE_KEY = index.TABLE_PLACE
155 empty1 = dt.EnumPair(code='0', text='')
156 empty2 = dt.EnumPair(code='00', text='')
158 def add(self, kind, ax, key_obj, **kwargs):
159 if ax.endet:
160 return
162 code = self.code(kind, key_obj)
163 value = dt.EnumPair(code, ax.bezeichnung)
165 p = dt.Place(**kwargs)
167 p.uid = kind + code
168 p.kind = kind
169 setattr(p, kind, value)
171 self.om.placeAll[p.uid] = p
172 self.om.placeIdx[p.uid] = value
174 return value
176 def collect(self):
177 self.om.placeAll = {}
178 self.om.placeIdx = {}
180 for ax in self.rr.read_flat(gid.AX_Bundesland):
181 self.add('land', ax, ax.schluessel)
183 for ax in self.rr.read_flat(gid.AX_Regierungsbezirk):
184 o = ax.schluessel
185 self.add('regierungsbezirk', ax, o, land=self.get_land(o))
187 for ax in self.rr.read_flat(gid.AX_KreisRegion):
188 o = ax.schluessel
189 self.add('kreis', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o))
191 for ax in self.rr.read_flat(gid.AX_Gemeinde):
192 o = ax.gemeindekennzeichen
193 self.add('gemeinde', ax, o, land=self.get_land(o), regierungsbezirk=self.get_regierungsbezirk(o), kreis=self.get_kreis(o))
195 # @TODO map Gemarkung to Gemeinde (see https://de.wikipedia.org/wiki/Liste_der_Gemarkungen_in_Nordrhein-Westfalen etc)
197 for ax in self.rr.read_flat(gid.AX_Gemarkung):
198 if str(ax.schluessel.gemarkungsnummer) in self.ix.excludeGemarkung:
199 continue
200 o = ax.schluessel
201 self.add('gemarkung', ax, o, land=self.get_land(o))
203 for ax in self.rr.read_flat(gid.AX_Buchungsblattbezirk):
204 o = ax.schluessel
205 self.add('buchungsblattbezirk', ax, o, land=self.get_land(o))
207 for ax in self.rr.read_flat(gid.AX_Dienststelle):
208 o = ax.schluessel
209 self.add('dienststelle', ax, o, land=self.get_land(o))
211 def write(self):
212 values = []
214 for place in self.om.placeAll.values():
215 values.append(dict(
216 uid=place.uid,
217 data=index.serialize(place),
218 ))
220 self.write_table(index.TABLE_PLACE, values)
222 def get_land(self, o):
223 return self.get('land', o) or self.empty2
225 def get_regierungsbezirk(self, o):
226 return self.get('regierungsbezirk', o) or self.empty1
228 def get_kreis(self, o):
229 return self.get('kreis', o) or self.empty2
231 def get_gemeinde(self, o):
232 return self.get('gemeinde', o) or self.empty1
234 def get_gemarkung(self, o):
235 return self.get('gemarkung', o) or self.empty1
237 def get_buchungsblattbezirk(self, o):
238 return self.get('buchungsblattbezirk', o) or self.empty1
240 def get_dienststelle(self, o):
241 return self.get('dienststelle', o) or self.empty1
243 def get(self, kind, o):
244 return self.om.placeIdx.get(kind + self.code(kind, o))
246 def is_empty(self, p: dt.EnumPair):
247 return p.code == '0' or p.code == '00'
249 CODES = {
250 'land': lambda o: o.land,
251 'regierungsbezirk': lambda o: o.land + (o.regierungsbezirk or '0'),
252 'kreis': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis,
253 'gemeinde': lambda o: o.land + (o.regierungsbezirk or '0') + o.kreis + o.gemeinde,
254 'gemarkung': lambda o: o.land + o.gemarkungsnummer,
255 'buchungsblattbezirk': lambda o: o.land + o.bezirk,
256 'dienststelle': lambda o: o.land + o.stelle,
257 }
259 def code(self, kind, o):
260 return self.CODES[kind](o)
263class _LageIndexer(_Indexer):
264 CACHE_KEY = index.TABLE_LAGE
266 def collect(self):
267 for ax in self.rr.read_flat(gid.AX_LagebezeichnungKatalogeintrag):
268 self.om.catalog[self.lage_key(ax.schluessel)] = ax.bezeichnung
270 for cls in (gid.AX_LagebezeichnungMitHausnummer, gid.AX_LagebezeichnungOhneHausnummer):
271 for uid, axs in self.rr.read_grouped(cls):
272 self.om.Lage.add(uid, [
273 _from_ax(
274 dt.LageRecord,
275 ax,
276 strasse=self.strasse(ax),
277 hausnummer=index.normalize_hausnummer(ax.hausnummer),
278 )
279 for ax in axs
280 ])
282 # use the PTO (art=HNR) geometry for lage coordinates
283 # PTO.dientZurDarstellungVon -> lage.uid
285 for pto in self.rr.read_flat(gid.AP_PTO):
286 if pto.lebenszeitintervall.endet is not None:
287 continue
288 art = _pop(pto, 'art') or ''
289 if art.upper() != 'HNR':
290 continue
292 uids = _pop(pto, 'dientZurDarstellungVon')
293 if not uids or not isinstance(uids, list):
294 continue
296 geom = _geom_of(pto)
297 if not geom:
298 continue
300 x = geom.centroid.x
301 y = geom.centroid.y
302 for la in self.om.Lage.get_many(uids):
303 la.x = x
304 la.y = y
306 # read related Gebaeude records
308 atts = _attributes(gid.METADATA['AX_Gebaeude'], dt.Gebaeude.PROP_KEYS)
310 for uid, axs in self.rr.read_grouped(gid.AX_Gebaeude):
311 self.om.Gebaeude.add(uid, [
312 _from_ax(
313 dt.GebaeudeRecord,
314 ax,
315 name=', '.join(ax.name) if ax.name else None,
316 amtlicheFlaeche=ax.grundflaeche or 0,
317 props=self.rr.props_from(ax, atts),
318 _zeigtAuf=ax.zeigtAuf,
319 )
320 for ax in axs
321 ])
323 for ge in self.om.Gebaeude:
324 for r in ge.recs:
325 geom = _geom_of(r)
326 r.geomFlaeche = round(geom.area, 2) if geom else 0
328 # omit Gebaeude geometries for now
329 for ge in self.om.Gebaeude:
330 for r in ge.recs:
331 _pop(r, 'geom')
333 for la in self.om.Lage:
334 la.gebaeudeList = []
336 # AX_Gebaeude.zeigtAuf -> AX_LagebezeichnungMitHausnummer
337 for ge in self.om.Gebaeude:
338 for la in self.om.Lage.get_from_ptr(ge, '_zeigtAuf'):
339 la.gebaeudeList.append(ge)
341 def strasse(self, ax):
342 if isinstance(ax.lagebezeichnung, str):
343 return ax.lagebezeichnung
344 return self.om.catalog.get(self.lage_key(ax.lagebezeichnung), '')
346 def lage_key(self, r):
347 return _comma([
348 getattr(r, 'land'),
349 getattr(r, 'regierungsbezirk'),
350 getattr(r, 'kreis'),
351 getattr(r, 'gemeinde'),
352 getattr(r, 'lage'),
353 ])
355 def write(self):
356 values = []
358 for la in self.om.Lage:
359 values.append(dict(
360 uid=la.uid,
361 rc=len(la.recs),
362 data=index.serialize(la),
363 ))
365 self.write_table(index.TABLE_LAGE, values)
368class _BuchungIndexer(_Indexer):
369 CACHE_KEY = index.TABLE_BUCHUNGSBLATT
371 buchungsblattkennzeichenMap: dict[str, dt.Buchungsblatt] = {}
373 def collect(self):
374 for uid, axs in self.rr.read_grouped(gid.AX_Anschrift):
375 self.om.Anschrift.add(uid, [
376 _from_ax(
377 dt.AnschriftRecord,
378 ax,
379 ort=ax.ort_AmtlichesOrtsnamensverzeichnis or ax.ort_Post,
380 plz=ax.postleitzahlPostzustellung,
381 telefon=ax.telefon[0] if ax.telefon else None
382 )
383 for ax in axs
384 ])
386 for uid, axs in self.rr.read_grouped(gid.AX_Person):
387 self.om.Person.add(uid, [
388 _from_ax(
389 dt.PersonRecord,
390 ax,
391 anrede=ax.anrede.text if ax.anrede else None,
392 _hat=ax.hat,
393 )
394 for ax in axs
395 ])
397 # AX_Person.hat -> [AX_Anschrift]
398 for pe in self.om.Person:
399 pe.anschriftList = self.om.Anschrift.get_from_ptr(pe, '_hat')
401 for uid, axs in self.rr.read_grouped(gid.AX_Namensnummer):
402 self.om.Namensnummer.add(uid, [
403 _from_ax(
404 dt.NamensnummerRecord,
405 ax,
406 anteil=_anteil(ax),
407 _benennt=ax.benennt,
408 _istBestandteilVon=ax.istBestandteilVon
409 )
410 for ax in axs
411 ])
413 # AX_Namensnummer.benennt -> AX_Person
414 for nn in self.om.Namensnummer:
415 nn.laufendeNummer = nn.recs[-1].laufendeNummerNachDIN1421
416 nn.personList = self.om.Person.get_from_ptr(nn, '_benennt')
418 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsstelle):
419 self.om.Buchungsstelle.add(uid, [
420 _from_ax(
421 dt.BuchungsstelleRecord,
422 ax,
423 anteil=_anteil(ax),
424 _an=ax.an,
425 _zu=ax.zu,
426 _istBestandteilVon=ax.istBestandteilVon,
427 )
428 for ax in axs
429 ])
431 for bs in self.om.Buchungsstelle:
432 bs.laufendeNummer = bs.recs[-1].laufendeNummer
433 bs.fsUids = []
434 bs.flurstueckskennzeichenList = []
436 for uid, axs in self.rr.read_grouped(gid.AX_Buchungsblatt):
437 self.om.Buchungsblatt.add(uid, [
438 _from_ax(
439 dt.BuchungsblattRecord,
440 ax,
441 buchungsblattbezirk=self.rr.place.get_buchungsblattbezirk(ax.buchungsblattbezirk),
442 )
443 for ax in axs
444 ])
446 for bb in self.om.Buchungsblatt:
447 bb.buchungsstelleList = []
448 bb.namensnummerList = []
449 bb.buchungsblattkennzeichen = bb.recs[-1].buchungsblattkennzeichen
450 self.buchungsblattkennzeichenMap[bb.buchungsblattkennzeichen] = bb
452 # AX_Buchungsstelle.istBestandteilVon -> AX_Buchungsblatt
453 for bs in self.om.Buchungsstelle:
454 bb_list = self.om.Buchungsblatt.get_from_ptr(bs, '_istBestandteilVon')
455 bs.buchungsblattUids = [bb.uid for bb in bb_list]
456 bs.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
457 for bb in bb_list:
458 bb.buchungsstelleList.append(bs)
460 # AX_Namensnummer.istBestandteilVon -> AX_Buchungsblatt
461 for nn in self.om.Namensnummer:
462 bb_list = self.om.Buchungsblatt.get_from_ptr(nn, '_istBestandteilVon')
463 nn.buchungsblattUids = [bb.uid for bb in bb_list]
464 nn.buchungsblattkennzeichenList = [bb.buchungsblattkennzeichen for bb in bb_list]
465 for bb in bb_list:
466 bb.namensnummerList.append(nn)
468 for bb in self.om.Buchungsblatt:
469 bb.buchungsstelleList.sort(key=_sortkey_buchungsstelle)
470 bb.namensnummerList.sort(key=_sortkey_namensnummer)
472 # AX_Buchungsstelle.an -> [AX_Buchungsstelle]
473 # AX_Buchungsstelle.zu -> [AX_Buchungsstelle]
474 # see Erläuterungen zu ALKIS Version 6, page 116-119
476 for bs in self.om.Buchungsstelle:
477 bs.childUids = []
478 bs.parentUids = []
479 bs.parentkennzeichenList = []
481 for bs in self.om.Buchungsstelle:
482 parent_uids = set()
483 parent_knz = set()
485 for r in bs.recs:
486 parent_uids.update(_pop(r, '_an'))
487 parent_uids.update(_pop(r, '_zu'))
489 for parent_bs in self.om.Buchungsstelle.get_many(parent_uids):
490 parent_bs.childUids.append(bs.uid)
491 bs.parentUids.append(parent_bs.uid)
492 for bb_knz in parent_bs.buchungsblattkennzeichenList:
493 parent_knz.add(bb_knz + '.' + parent_bs.laufendeNummer)
495 bs.parentkennzeichenList = sorted(parent_knz)
497 def write(self):
498 values = []
500 for bb in self.om.Buchungsblatt:
501 values.append(dict(
502 uid=bb.uid,
503 rc=len(bb.recs),
504 data=index.serialize(bb),
505 ))
507 self.write_table(index.TABLE_BUCHUNGSBLATT, values)
510class _PartIndexer(_Indexer):
511 CACHE_KEY = index.TABLE_PART
512 MIN_PART_AREA = 1
514 parts: list[dt.Part] = []
516 fs_list = []
517 fs_geom = []
519 stree: shapely.strtree.STRtree
521 def collect(self):
523 for kind in dt.Part.KIND:
524 self.collect_kind(kind)
526 for fs in self.rr.fsdata.om.Flurstueck:
527 self.fs_list.append(fs)
528 # NB take only the most recent fs geometry into account
529 self.fs_geom.append(_geom_of(fs.recs[-1]))
531 self.stree = shapely.strtree.STRtree(self.fs_geom)
533 with ProgressIndicator(f'ALKIS: parts', len(self.om.Part)) as progress:
534 for pa in self.om.Part:
535 self.compute_intersections(pa)
536 progress.update(1)
538 self.parts.sort(key=_sortkey_part)
540 for pa in self.parts:
541 pa.isHistoric = all(r.isHistoric for r in pa.recs)
543 def collect_kind(self, kind):
544 _, key = dt.Part.KIND[kind]
545 classes = [
546 getattr(gid, meta['name'])
547 for meta in gid.METADATA.values()
548 if (
549 meta['kind'] == 'object'
550 and meta['geom']
551 and re.search(key + r'/\w+/', meta['key'])
552 )
553 ]
555 for cls in classes:
556 self.collect_class(kind, cls)
558 def collect_class(self, kind, cls):
559 meta = gid.METADATA[cls.__name__]
560 atts = _attributes(meta, dt.Part.PROP_KEYS)
562 for uid, axs in self.rr.read_grouped(cls):
563 pa = self.om.Part.add(uid, [
564 _from_ax(
565 dt.PartRecord,
566 ax,
567 props=self.rr.props_from(ax, atts),
568 )
569 for ax in axs
570 ])
571 pa.kind = kind
572 pa.name = dt.EnumPair(meta['uid'], meta['title'])
574 def compute_intersections(self, pa: dt.Part):
575 parts_map = {}
577 for r in pa.recs:
578 geom = _geom_of(r)
579 if not geom:
580 continue
582 for i in self.stree.query(geom):
583 part_geom = shapely.intersection(self.fs_geom[i], geom)
584 part_area = round(part_geom.area, 2)
585 if part_area < self.MIN_PART_AREA:
586 continue
588 fs = self.fs_list[i]
590 part = parts_map.setdefault(fs.uid, dt.Part(
591 uid=pa.uid,
592 recs=[],
593 kind=pa.kind,
594 name=pa.name,
595 fs=fs.uid,
596 ))
598 # computed area corrected with respect to FS's "amtlicheFlaeche"
599 part_area_corrected = round(
600 fs.recs[-1].amtlicheFlaeche * (part_area / fs.recs[-1].geomFlaeche),
601 2)
603 part.recs.append(dt.PartRecord(
604 uid=r.uid,
605 beginnt=r.beginnt,
606 endet=r.endet,
607 anlass=r.anlass,
608 props=r.props,
609 geomFlaeche=part_area,
610 amtlicheFlaeche=part_area_corrected,
611 isHistoric=r.endet is not None,
612 ))
614 part.geom = shapely.wkb.dumps(part_geom, srid=self.ix.crs.srid, hex=True)
615 part.geomFlaeche = part_area
616 part.amtlicheFlaeche = part_area_corrected
618 self.parts.extend(parts_map.values())
620 def write(self):
621 values = []
623 for n, pa in enumerate(self.parts, 1):
624 geom = _pop(pa, 'geom')
625 data = index.serialize(pa)
626 pa.geom = geom
627 values.append(dict(
628 n=n,
629 fs=pa.fs,
630 uid=pa.uid,
631 beginnt=pa.recs[-1].beginnt,
632 endet=pa.recs[-1].endet,
633 kind=pa.kind,
634 name=pa.name.text,
635 parthistoric=pa.isHistoric,
636 data=data,
637 geom=geom,
638 ))
640 self.write_table(index.TABLE_PART, values)
643class _FsDataIndexer(_Indexer):
644 CACHE_KEY = index.TABLE_FLURSTUECK
646 def __init__(self, runner: '_Runner'):
647 super().__init__(runner)
648 self.counts = {
649 'ok': 0,
650 'no_geometry': 0,
651 'excluded': 0,
652 }
654 def collect(self):
655 for uid, axs in self.rr.read_grouped(gid.AX_Flurstueck):
656 recs = gws.u.compact(self.record(ax) for ax in axs)
657 if recs:
658 self.om.Flurstueck.add(uid, recs)
660 for uid, axs in self.rr.read_grouped(gid.AX_HistorischesFlurstueck):
661 recs = gws.u.compact(self.record(ax) for ax in axs)
662 if not recs:
663 continue
664 # For a historic FS, 'beginnt' is basically when the history beginnt
665 # (see comments for AX_HistorischesFlurstueck in gid6).
666 # we set F.endet=F.beginnt to designate this one as 'historic'
667 for r in recs:
668 r.endet = r.beginnt
669 r.isHistoric = True
670 self.om.Flurstueck.add(uid, recs)
672 for fs in self.om.Flurstueck:
673 fs.flurstueckskennzeichen = fs.recs[-1].flurstueckskennzeichen
674 fs.fsnummer = index.make_fsnummer(fs.recs[-1])
675 fs.x = fs.recs[-1].x
676 fs.y = fs.recs[-1].y
677 self.process_lage(fs)
678 self.process_gebaeude(fs)
679 self.process_buchung(fs)
681 # check the 'nachfolgerFlurstueckskennzeichen' array
682 # and mark each referenced FS as a "vorgaenger" FS
683 # It is a M:N relation, therefore 'vorgaengerFlurstueckskennzeichen' is also an array
685 k_to_fs = {
686 fs.flurstueckskennzeichen: fs
687 for fs in self.om.Flurstueck
688 }
689 for fs in self.om.Flurstueck:
690 nf_keys = fs.recs[-1].nachfolgerFlurstueckskennzeichen
691 if not nf_keys:
692 continue
693 for nf_key in nf_keys:
694 nf_fs = k_to_fs.get(nf_key)
695 if not nf_fs:
696 gws.log.warning(f'ALKIS: nachfolgerFlurstueck missing {fs.flurstueckskennzeichen!r}->{nf_key!r}')
697 continue
698 if not nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen:
699 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen = []
700 nf_fs.recs[-1].vorgaengerFlurstueckskennzeichen.append(fs.flurstueckskennzeichen)
703 def record(self, ax):
704 r: dt.FlurstueckRecord = _from_ax(
705 dt.FlurstueckRecord,
706 ax,
707 amtlicheFlaeche=ax.amtlicheFlaeche or 0,
708 flurnummer=_str(ax.flurnummer),
709 zaehler=_str(ax.flurstuecksnummer.zaehler),
710 nenner=_str(ax.flurstuecksnummer.nenner),
711 zustaendigeStelle=[self.rr.place.get_dienststelle(p) for p in (ax.zustaendigeStelle or [])],
713 _weistAuf=ax.weistAuf,
714 _zeigtAuf=ax.zeigtAuf,
715 _istGebucht=ax.istGebucht,
716 _buchung=ax.buchung,
717 )
719 # basic data
721 r.gemarkung = self.rr.place.get_gemarkung(ax.gemarkung)
722 r.gemeinde = self.rr.place.get_gemeinde(ax.gemeindezugehoerigkeit)
723 r.regierungsbezirk = self.rr.place.get_regierungsbezirk(ax.gemeindezugehoerigkeit)
724 r.kreis = self.rr.place.get_kreis(ax.gemeindezugehoerigkeit)
725 r.land = self.rr.place.get_land(ax.gemeindezugehoerigkeit)
727 if self.rr.place.is_empty(r.gemarkung) or self.rr.place.is_empty(r.gemeinde):
728 # exclude Flurstücke that refer to Gemeinde/Gemarkung
729 # which do not exist in the reference AX tables
730 self.counts['excluded'] += 1
731 return None
733 # geometry
735 geom = _geom_of(r)
736 if not geom:
737 self.counts['no_geometry'] += 1
738 return None
740 r.geomFlaeche = round(geom.area, 2)
741 r.x = round(geom.centroid.x, 2)
742 r.y = round(geom.centroid.y, 2)
744 self.counts['ok'] += 1
745 return r
747 def process_lage(self, fs: dt.Flurstueck):
748 fs.lageList = []
750 # AX_Flurstueck.weistAuf -> AX_LagebezeichnungMitHausnummer
751 # AX_Flurstueck.zeigtAuf -> AX_LagebezeichnungOhneHausnummer
752 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_weistAuf'))
753 fs.lageList.extend(self.rr.lage.om.Lage.get_from_ptr(fs, '_zeigtAuf'))
755 def process_gebaeude(self, fs: dt.Flurstueck):
756 ge_map = {}
758 for la in fs.lageList:
759 for ge in la.gebaeudeList:
760 ge_map[ge.uid] = ge
762 fs.gebaeudeList = list(ge_map.values())
763 fs.gebaeudeList.sort(key=_sortkey_gebaeude)
765 fs.gebaeudeAmtlicheFlaeche = sum(ge.recs[-1].amtlicheFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
766 fs.gebaeudeGeomFlaeche = sum(ge.recs[-1].geomFlaeche for ge in fs.gebaeudeList if not ge.recs[-1].endet)
768 def process_buchung(self, fs: dt.Flurstueck):
769 bs_historic_map = {}
770 bs_seen = set()
771 buchung_map = {}
773 # for each Flurstück record, we collect all related Buchungsstellen (with respect to parent-child relations)
774 # then group Buchungsstellen by their Buchungsblatt
775 # and create Buchung objects for a FS
777 for r in fs.recs:
778 hist_buchung = _pop(r, '_buchung')
779 if hist_buchung:
780 bs_list = self.historic_buchungsstelle_list(r, hist_buchung)
781 else:
782 bs_list = self.buchungsstelle_list(r)
784 for bs in bs_list:
785 # a Buchungsstelle referred to by an expired Flurstück might not be expired itself,
786 # so we have to track its state separately by wrapping it in a BuchungsstelleReference
787 # a BuchungsstelleReference is historic if its Flurstück is
788 bs_historic_map[bs.uid] = r.isHistoric
790 if bs.uid in bs_seen:
791 continue
792 bs_seen.add(bs.uid)
794 # populate Flurstück references in a Buchungsstelle
795 if fs.uid not in bs.fsUids:
796 bs.fsUids.append(fs.uid)
797 bs.flurstueckskennzeichenList.append(fs.flurstueckskennzeichen)
799 # create Buchung records by grouping Buchungsstellen
800 for bb_uid in bs.buchungsblattUids:
801 bu = buchung_map.setdefault(bb_uid, dt.Buchung(recs=[], buchungsblattUid=bb_uid))
802 bu.recs.append(dt.BuchungsstelleReference(buchungsstelle=bs))
804 fs.buchungList = list(buchung_map.values())
806 for bu in fs.buchungList:
807 for ref in bu.recs:
808 ref.isHistoric = bs_historic_map[ref.buchungsstelle.uid]
809 bu.isHistoric = all(ref.isHistoric for ref in bu.recs)
811 return fs
813 def historic_buchungsstelle_list(self, r: dt.FlurstueckRecord, hist_buchung):
814 # an AX_HistorischesFlurstueck with a special 'buchung' reference
816 bs_list = []
818 for bu in hist_buchung:
819 bb = self.rr.buchung.buchungsblattkennzeichenMap.get(bu.buchungsblattkennzeichen)
820 if not bb:
821 continue
822 # create a fake historic Buchungstelle
823 bs_list.append(dt.Buchungsstelle(
824 uid=bb.uid + '_' + bu.laufendeNummerDerBuchungsstelle,
825 recs=[
826 dt.BuchungsstelleRecord(
827 endet=r.endet,
828 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
829 isHistoric=True,
830 )
831 ],
832 buchungsblattUids=[bb.uid],
833 buchungsblattkennzeichenList=[bb.buchungsblattkennzeichen],
834 parentUids=[],
835 childUids=[],
836 fsUids=[],
837 parentkennzeichenList=[],
838 flurstueckskennzeichenList=[],
839 laufendeNummer=bu.laufendeNummerDerBuchungsstelle,
840 isHistoric=True,
841 ))
843 return bs_list
845 def buchungsstelle_list(self, r: dt.FlurstueckRecord):
846 # AX_Flurstueck.istGebucht -> AX_Buchungsstelle
848 this_bs = self.rr.buchung.om.Buchungsstelle.get(_pop(r, '_istGebucht'))
849 if not this_bs:
850 return []
852 bs_list = []
854 # A Flurstück points to a Buchungsstelle (F.istGebucht -> B).
855 # A Buchungsstelle can have parent (B.an -> parent.uid) and child (child.an -> B.uid) Buchungsstellen
856 # (these references are populated in _BuchungIndexer above).
857 # Our task here, given F.istGebucht -> B, collect B's parents and children
858 # These are Buchungsstellen that directly or indirectly mention the current Flurstück.
860 queue: list[dt.Buchungsstelle] = [this_bs]
861 while queue:
862 bs = queue.pop(0)
863 bs_list.insert(0, bs)
864 for uid in bs.parentUids:
865 queue.append(self.rr.buchung.om.Buchungsstelle.get(uid))
867 # remove this_bs
868 bs_list.pop()
870 queue: list[dt.Buchungsstelle] = [this_bs]
871 while queue:
872 bs = queue.pop(0)
873 bs_list.append(bs)
874 # sort related (child) Buchungsstellen by their BB-Kennzeichen
875 child_bs_list = self.rr.buchung.om.Buchungsstelle.get_many(bs.childUids)
876 child_bs_list.sort(key=_sortkey_buchungsstelle_by_bblatt)
877 queue.extend(child_bs_list)
879 # if len(bs_list) > 1:
880 # gws.log.debug(f'bs chain: {r.uid=} {this_bs.uid=} {[bs.uid for bs in bs_list]}')
882 return bs_list
884 def write(self):
885 values = []
887 for fs in self.om.Flurstueck:
888 geoms = [_pop(r, 'geom') for r in fs.recs]
889 data = index.serialize(fs)
890 for r, g in zip(fs.recs, geoms):
891 r.geom = g
893 values.append(dict(
894 uid=fs.uid,
895 rc=len(fs.recs),
896 fshistoric=fs.isHistoric,
897 data=data,
898 geom=geoms[-1],
899 ))
901 self.write_table(index.TABLE_FLURSTUECK, values)
904class _FsIndexIndexer(_Indexer):
905 entries = {
906 index.TABLE_INDEXFLURSTUECK: [],
907 index.TABLE_INDEXLAGE: [],
908 index.TABLE_INDEXBUCHUNGSBLATT: [],
909 index.TABLE_INDEXPERSON: [],
910 index.TABLE_INDEXGEOM: [],
911 }
913 def collect(self):
914 with ProgressIndicator(f'ALKIS: creating indexes', len(self.rr.fsdata.om.Flurstueck)) as progress:
915 for fs in self.rr.fsdata.om.Flurstueck:
916 for r in fs.recs:
917 self.add(fs, r)
918 progress.update(1)
920 def add(self, fs: dt.Flurstueck, r: dt.FlurstueckRecord):
921 base = dict(
922 fs=r.uid,
923 fshistoric=r.isHistoric,
924 )
926 places = dict(
927 land=r.land.text,
928 land_t=index.text_key(r.land.text),
929 landcode=r.land.code,
931 regierungsbezirk=r.regierungsbezirk.text,
932 regierungsbezirk_t=index.text_key(r.regierungsbezirk.text),
933 regierungsbezirkcode=r.regierungsbezirk.code,
935 kreis=r.kreis.text,
936 kreis_t=index.text_key(r.kreis.text),
937 kreiscode=r.kreis.code,
939 gemeinde=r.gemeinde.text,
940 gemeinde_t=index.text_key(r.gemeinde.text),
941 gemeindecode=r.gemeinde.code,
943 gemarkung=r.gemarkung.text,
944 gemarkung_t=index.text_key(r.gemarkung.text),
945 gemarkungcode=r.gemarkung.code,
947 )
949 self.entries[index.TABLE_INDEXFLURSTUECK].append(dict(
950 **base,
951 **places,
953 amtlicheflaeche=r.amtlicheFlaeche,
954 geomflaeche=r.geomFlaeche,
956 flurnummer=r.flurnummer,
957 zaehler=r.zaehler,
958 nenner=r.nenner,
959 flurstuecksfolge=r.flurstuecksfolge,
960 flurstueckskennzeichen=r.flurstueckskennzeichen,
962 x=r.x,
963 y=r.y,
964 ))
966 self.entries[index.TABLE_INDEXGEOM].append(dict(
967 **base,
968 geomflaeche=r.geomFlaeche,
969 x=r.x,
970 y=r.y,
971 geom=r.geom,
972 ))
974 for la in fs.lageList:
975 for la_r in la.recs:
976 self.entries[index.TABLE_INDEXLAGE].append(dict(
977 **base,
978 **places,
979 lageuid=la_r.uid,
980 lagehistoric=la_r.isHistoric,
981 strasse=la_r.strasse,
982 strasse_t=index.strasse_key(la_r.strasse),
983 hausnummer=la_r.hausnummer,
984 x=la.x or r.x,
985 y=la.y or r.y,
986 ))
988 for bu in fs.buchungList:
989 bb = self.rr.buchung.om.Buchungsblatt.get(bu.buchungsblattUid)
991 for bb_r in bb.recs:
992 self.entries[index.TABLE_INDEXBUCHUNGSBLATT].append(dict(
993 **base,
994 buchungsblattuid=bb_r.uid,
995 buchungsblattkennzeichen=bb_r.buchungsblattkennzeichen,
996 buchungsblatthistoric=bu.isHistoric,
997 ))
999 pe_uids = set()
1001 for nn in bb.namensnummerList:
1002 for pe in nn.personList:
1003 if pe.uid in pe_uids:
1004 continue
1005 pe_uids.add(pe.uid)
1006 for pe_r in pe.recs:
1007 self.entries[index.TABLE_INDEXPERSON].append(dict(
1008 **base,
1009 personuid=pe_r.uid,
1010 personhistoric=pe_r.isHistoric,
1011 name=pe_r.nachnameOderFirma,
1012 name_t=index.text_key(pe_r.nachnameOderFirma),
1013 vorname=pe_r.vorname,
1014 vorname_t=index.text_key(pe_r.vorname),
1015 ))
1017 def write(self):
1018 for table_id, values in self.entries.items():
1019 if not self.ix.has_table(table_id):
1020 for n, v in enumerate(values, 1):
1021 v['n'] = n
1022 self.write_table(table_id, values)
1025class _Runner:
1026 def __init__(self, ix: index.Object, reader: dt.Reader, with_cache=False):
1027 self.ix: index.Object = ix
1028 self.reader: dt.Reader = reader
1030 self.withCache = with_cache
1031 self.cacheDir = gws.c.CACHE_DIR + '/alkis'
1032 if self.withCache:
1033 gws.u.ensure_dir(self.cacheDir)
1035 self.place = _PlaceIndexer(self)
1036 self.lage = _LageIndexer(self)
1037 self.buchung = _BuchungIndexer(self)
1038 self.part = _PartIndexer(self)
1039 self.fsdata = _FsDataIndexer(self)
1040 self.fsindex = _FsIndexIndexer(self)
1042 self.initMemory = gws.lib.osx.process_rss_size()
1044 def run(self):
1045 with self.ix.db.connect() as conn:
1046 with ProgressIndicator(f'ALKIS: indexing'):
1047 self.place.load_or_collect()
1048 self.memory_info()
1050 self.buchung.load_or_collect()
1051 self.memory_info()
1053 self.lage.load_or_collect()
1054 self.memory_info()
1056 self.fsdata.load_or_collect()
1057 gws.log.info(f'ALKIS: fs counts: {self.fsdata.counts}')
1058 self.memory_info()
1060 self.part.load_or_collect()
1061 self.memory_info()
1063 self.fsindex.collect()
1064 self.memory_info()
1066 self.place.write()
1067 self.buchung.write()
1068 self.lage.write()
1069 self.fsdata.write()
1070 self.part.write()
1071 self.fsindex.write()
1073 def memory_info(self):
1074 v = gws.lib.osx.process_rss_size() - self.initMemory
1075 if v > 0:
1076 gws.log.info(f'ALKIS: memory used: {v:.2f} MB', stacklevel=2)
1078 def read_flat(self, cls):
1079 cpath = self.cacheDir + '/flat_' + cls.__name__
1080 if self.withCache and gws.u.is_file(cpath):
1081 return gws.u.unserialize_from_path(cpath)
1083 rs = self._read_flat(cls)
1084 if self.withCache:
1085 gws.u.serialize_to_path(rs, cpath)
1087 return rs
1089 def _read_flat(self, cls):
1090 cnt = self.reader.count(cls)
1091 if cnt <= 0:
1092 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1093 return []
1095 rs = []
1096 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1097 for ax in self.reader.read_all(cls):
1098 rs.append(ax)
1099 progress.update(1)
1100 return rs
1102 def read_grouped(self, cls):
1103 cpath = self.cacheDir + '/grouped_' + cls.__name__
1104 if self.withCache and gws.u.is_file(cpath):
1105 return gws.u.unserialize_from_path(cpath)
1107 rs = self._read_grouped(cls)
1108 if self.withCache:
1109 gws.u.serialize_to_path(rs, cpath)
1111 return rs
1113 def _read_grouped(self, cls):
1114 cnt = self.reader.count(cls)
1115 if cnt <= 0:
1116 gws.log.warning(f'ALKIS: read {cls.__name__}: empty table')
1117 return []
1119 groups = {}
1120 with ProgressIndicator(f'ALKIS: read {cls.__name__}', cnt) as progress:
1121 for ax in self.reader.read_all(cls):
1122 groups.setdefault(ax.identifikator, []).append(ax)
1123 progress.update(1)
1124 for g in groups.values():
1125 g.sort(key=_sortkey_lebenszeitintervall)
1127 return list(groups.items())
1129 def props_from(self, ax, atts):
1130 props = []
1132 for a in atts:
1133 v = getattr(ax, a['name'], None)
1134 if isinstance(v, gid.Object):
1135 v = self.object_prop_value(v)
1136 if not gws.u.is_empty(v):
1137 props.append([a['title'], v])
1139 return props
1141 def object_prop_value(self, o):
1142 return None
1143 # @TODO handle properties which are objects
1144 # gid.AX_Bundesland_Schluessel
1145 # gid.AX_Dienststelle_Schluessel
1146 # gid.AX_Gemarkung_Schluessel
1147 # gid.AX_Gemeindekennzeichen
1148 # gid.AX_Kreis_Schluessel
1149 # gid.AX_Regierungsbezirk_Schluessel
1150 # gid.AX_KennzifferGrabloch
1151 # gid.AX_Lagebezeichnung
1152 # gid.AX_Tagesabschnitt
1153 # gid.AX_Verwaltungsgemeinschaft_Schluessel
1156def _from_ax(cls, ax, **kwargs):
1157 d = {}
1159 if ax:
1160 for k in cls.__annotations__:
1161 v = getattr(ax, k, None)
1162 if v:
1163 d[k] = v
1165 d['uid'] = ax.identifikator
1166 d['beginnt'] = ax.lebenszeitintervall.beginnt
1167 d['endet'] = ax.lebenszeitintervall.endet
1168 d['isHistoric'] = d['endet'] is not None
1169 if ax.anlass and ax.anlass[0].code != '000000':
1170 d['anlass'] = ax.anlass[0]
1171 if ax.geom:
1172 d['geom'] = ax.geom
1174 d.update(kwargs)
1175 return cls(**d)
1178def _anteil(ax):
1179 try:
1180 z = float(ax.anteil.zaehler)
1181 z = str(int(z) if z.is_integer() else z)
1182 n = float(ax.anteil.nenner)
1183 n = str(int(n) if n.is_integer() else n)
1184 return z + '/' + n
1185 except (AttributeError, ValueError, TypeError):
1186 pass
1189def _attributes(meta, keys):
1190 return sorted(
1191 [a for a in meta['attributes'] if a['name'] in keys],
1192 key=lambda a: a['title']
1193 )
1196def _geom_of(o):
1197 if not o.geom:
1198 gws.log.warning(f'{o.__class__.__name__}:{o.uid}: no geometry')
1199 return
1200 return shapely.wkb.loads(o.geom, hex=True)
1203def _pop(obj, attr):
1204 v = getattr(obj, attr, None)
1205 try:
1206 delattr(obj, attr)
1207 except AttributeError:
1208 pass
1209 return v
1212def _sortkey_beginnt(o):
1213 return o.beginnt
1216def _sortkey_lebenszeitintervall(o):
1217 return o.lebenszeitintervall.beginnt
1220def _sortkey_namensnummer(nn: dt.Namensnummer):
1221 return _natkey(nn.recs[-1].laufendeNummerNachDIN1421), nn.recs[-1].beginnt
1224def _sortkey_buchungsstelle(bs: dt.Buchungsstelle):
1225 return _natkey(bs.recs[-1].laufendeNummer), bs.recs[-1].beginnt
1228def _sortkey_buchungsstelle_by_bblatt(bs: dt.Buchungsstelle):
1229 return bs.buchungsblattkennzeichenList[0], bs.recs[-1].beginnt
1232def _sortkey_part(pa: dt.Part):
1233 return pa.name.text, -pa.geomFlaeche
1236def _sortkey_gebaeude(ge: dt.Gebaeude):
1237 # sort Gebaeude by area (big->small)
1238 return ge.recs[-1].beginnt, -ge.recs[-1].geomFlaeche
1241def _natkey(v):
1242 if not v:
1243 return []
1244 return [
1245 '{:080d}'.format(int(digits)) if digits else chars.lower()
1246 for digits, chars in re.findall(r'(\d+)|(\D+)', v.strip())
1247 ]
1250def _comma(a):
1251 return ','.join(str(s) if s is not None else '' for s in a)
1254def _str(x):
1255 return None if x is None else str(x)