Coverage for gws-app/gws/plugin/alkis/data/norbit6.py: 0%
154 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Read data from Norbit plugin tables (GeoInfoDok 6)."""
3import gws
4import gws.base.database
5import gws.lib.datetimex
6import gws.lib.sa as sa
7import gws.plugin.postgres.provider
9from .geo_info_dok import gid6 as gid
10from . import types as dt
13class Object(dt.Reader):
14 STD_READERS = {
15 'Area': 'as_float',
16 'Boolean': 'as_bool',
17 'CharacterString': 'as_str',
18 'Integer': 'as_int',
19 'DateTime': 'as_date',
20 'Date': 'as_date',
22 'AX_Lagebezeichnung': 'as_ax_lagebezeichnung',
23 'AX_Buchung_HistorischesFlurstueck': 'as_ax_buchung_historischesflurstueck',
24 }
26 def __init__(self, provider: gws.plugin.postgres.provider.Object, schema='public'):
27 self.db = provider
28 self.schema = schema
30 self.readers = {}
32 for meta in gid.METADATA.values():
33 if meta['kind'] not in {'struct', 'object'}:
34 continue
35 d = {}
36 for sup in meta['supers']:
37 d.update(self.readers.get(sup, {}))
38 for attr in meta['attributes']:
39 d[attr['name']] = [
40 attr['name'],
41 attr['name'].lower(),
42 getattr(gid, attr['type'], None),
43 self.get_reader(attr)
44 ]
46 self.readers[meta['name']] = d
48 def get_reader(self, attr):
49 typ = attr['type']
50 is_list = attr['list']
52 std = self.STD_READERS.get(typ)
53 if std:
54 return getattr(self, std + '_list' if is_list else std)
56 meta = gid.METADATA.get(typ)
57 if meta:
58 if meta['kind'] == 'object':
59 return self.as_ref_list if is_list else self.as_ref
60 if meta['kind'] == 'struct':
61 return self.as_struct_list if is_list else self.as_struct
62 if meta['kind'] == 'enum':
63 return self.as_enum_list if is_list else self.as_enum
65 return self.as_str_list if is_list else self.as_str
67 ##
69 def count(self, cls, table_name=None):
70 # NB not using db.count to avoid schema introspection
71 sql = f"SELECT COUNT(*) FROM {self.schema}.{table_name or cls.__name__.lower()}"
72 with self.db.connect() as conn:
73 try:
74 rs = list(conn.execute(sa.text(sql)))
75 return rs[0][0]
76 except sa.Error:
77 conn.rollback()
78 return 0
80 def read_all(self, cls, table_name=None, uids=None):
81 sql = f"SELECT * FROM {self.schema}.{table_name or cls.__name__.lower()}"
82 if uids:
83 sql += ' WHERE gml_id IN (:uids)'
84 sql = sa.text(sql).bindparams(uids=uids)
85 else:
86 sql = sa.text(sql)
88 with self.db.connect() as conn:
89 for row in conn.execute(sql, execution_options={'stream_results': True}):
90 r = gws.u.to_dict(row)
91 o = self.as_struct(cls, '', r)
92 o.identifikator = r.get('gml_id')
93 o.geom = r.get('wkb_geometry', '')
94 yield o
96 ##
98 def as_struct(self, cls, prop, r):
99 d = {}
101 for attr_name, attr_low_name, attr_cls, fn in self.readers[cls.__name__].values():
102 comp_key = prop.lower() + '_' + attr_low_name
103 if comp_key in r:
104 d[attr_name] = fn(attr_cls, comp_key, r)
105 else:
106 d[attr_name] = fn(attr_cls, attr_low_name, r)
108 o = cls()
109 vars(o).update(d)
110 return o
112 def as_struct_list(self, cls, prop, r):
113 d = {}
115 for attr_name, attr_low_name, attr_cls, fn in self.readers[cls.__name__].values():
116 comp_key = prop.lower() + '_' + attr_low_name
117 if comp_key in r:
118 a = _array(r[comp_key])
119 else:
120 a = _array(r.get(attr_low_name))
121 if a:
122 d[attr_low_name] = a
124 objs = []
126 for vals in zip(*d.values()):
127 r2 = dict(zip(d, vals))
128 o = self.as_struct(cls, '', r2)
129 objs.append(o)
131 return objs
133 def as_ref(self, cls, prop, r):
134 return r.get(prop)
136 def as_ref_list(self, cls, prop, r):
137 return _array(r.get(prop))
139 def as_enum(self, cls, prop, r):
140 v = str(r.get(prop))
141 if v in cls.VALUES:
142 return dt.EnumPair(v, cls.VALUES[v])
144 def as_enum_list(self, cls, prop, r):
145 ls = []
147 for v in _array(r.get(prop)):
148 v = str(v)
149 if v in cls.VALUES:
150 ls.append(dt.EnumPair(v, cls.VALUES[v]))
152 return ls
154 ##
156 def as_ax_lagebezeichnung(self, cls, prop, r):
157 if r.get('unverschluesselt'):
158 return r.get('unverschluesselt')
159 return self.as_struct(gid.AX_VerschluesselteLagebezeichnung, prop, r)
161 def as_ax_buchung_historischesflurstueck_list(self, cls, prop, r):
162 # this one is stored as
163 # "blattart": [xxxx],
164 # "buchungsart": ["xxxx"],
165 # ....etc
166 # "buchungsblattbezirk_bezirk": ["xxxx"],
167 # "buchungsblattbezirk_land": ["xx"],
168 # "buchungsblattkennzeichen": ["xxxx"],
169 # "buchungsblattnummermitbuchstabenerweiterung": ["xxxx"],
171 bbs = self.as_struct_list(gid.AX_Buchung_HistorischesFlurstueck, '', r)
172 bzs = self.as_struct_list(gid.AX_Buchungsblattbezirk_Schluessel, 'buchungsblattbezirk', r)
173 objs = []
174 for bb, bz in zip(bbs, bzs):
175 bb.buchungsblattbezirk = bz
176 objs.append(bb)
177 return objs
179 ##
181 def as_str(self, cls, prop, r):
182 return _str(r.get(prop))
184 def as_str_list(self, cls, prop, r):
185 return [_str(v) for v in _array(r.get(prop))]
187 def as_bool(self, cls, prop, r):
188 return _bool(r.get(prop))
190 def as_bool_list(self, cls, prop, r):
191 return [_bool(v) for v in _array(r.get(prop))]
193 def as_int(self, cls, prop, r):
194 return _int(r.get(prop))
196 def as_int_list(self, cls, prop, r):
197 return [_int(v) for v in _array(r.get(prop))]
199 def as_float(self, cls, prop, r):
200 return _float(r.get(prop))
202 def as_float_list(self, cls, prop, r):
203 return [_float(v) for v in _array(r.get(prop))]
205 def as_date(self, cls, prop, r):
206 return _datetime(r.get(prop))
208 def as_date_list(self, cls, prop, r):
209 return [_datetime(v) for v in _array(r.get(prop))]
212def _str(v):
213 if v is not None:
214 v = str(v).strip()
215 if v:
216 return v
219def _bool(v):
220 return bool(v) if v is not None else None
223def _int(v):
224 return int(v) if v is not None else None
227def _float(v):
228 return float(v) if v is not None else None
231def _datetime(v):
232 if not v:
233 return None
234 if isinstance(v, str):
235 return gws.lib.datetimex.from_iso_string(v)
236 return v
239def _array(val):
240 if isinstance(val, list):
241 return val
242 if val is None:
243 return []
244 return [val]