Coverage for gws-app/gws/plugin/alkis/data/norbit6.py: 0%

154 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Read data from Norbit plugin tables (GeoInfoDok 6).""" 

2 

3import gws 

4import gws.base.database 

5import gws.lib.datetimex 

6import gws.lib.sa as sa 

7import gws.plugin.postgres.provider 

8 

9from .geo_info_dok import gid6 as gid 

10from . import types as dt 

11 

12 

13class Object(dt.Reader): 

14 STD_READERS = { 

15 'Area': 'as_float', 

16 'Boolean': 'as_bool', 

17 'CharacterString': 'as_str', 

18 'Integer': 'as_int', 

19 'DateTime': 'as_date', 

20 'Date': 'as_date', 

21 

22 'AX_Lagebezeichnung': 'as_ax_lagebezeichnung', 

23 'AX_Buchung_HistorischesFlurstueck': 'as_ax_buchung_historischesflurstueck', 

24 } 

25 

26 def __init__(self, provider: gws.plugin.postgres.provider.Object, schema='public'): 

27 self.db = provider 

28 self.schema = schema 

29 

30 self.readers = {} 

31 

32 for meta in gid.METADATA.values(): 

33 if meta['kind'] not in {'struct', 'object'}: 

34 continue 

35 d = {} 

36 for sup in meta['supers']: 

37 d.update(self.readers.get(sup, {})) 

38 for attr in meta['attributes']: 

39 d[attr['name']] = [ 

40 attr['name'], 

41 attr['name'].lower(), 

42 getattr(gid, attr['type'], None), 

43 self.get_reader(attr) 

44 ] 

45 

46 self.readers[meta['name']] = d 

47 

48 def get_reader(self, attr): 

49 typ = attr['type'] 

50 is_list = attr['list'] 

51 

52 std = self.STD_READERS.get(typ) 

53 if std: 

54 return getattr(self, std + '_list' if is_list else std) 

55 

56 meta = gid.METADATA.get(typ) 

57 if meta: 

58 if meta['kind'] == 'object': 

59 return self.as_ref_list if is_list else self.as_ref 

60 if meta['kind'] == 'struct': 

61 return self.as_struct_list if is_list else self.as_struct 

62 if meta['kind'] == 'enum': 

63 return self.as_enum_list if is_list else self.as_enum 

64 

65 return self.as_str_list if is_list else self.as_str 

66 

67 ## 

68 

69 def count(self, cls, table_name=None): 

70 # NB not using db.count to avoid schema introspection 

71 sql = f"SELECT COUNT(*) FROM {self.schema}.{table_name or cls.__name__.lower()}" 

72 with self.db.connect() as conn: 

73 try: 

74 rs = list(conn.execute(sa.text(sql))) 

75 return rs[0][0] 

76 except sa.Error: 

77 conn.rollback() 

78 return 0 

79 

80 def read_all(self, cls, table_name=None, uids=None): 

81 sql = f"SELECT * FROM {self.schema}.{table_name or cls.__name__.lower()}" 

82 if uids: 

83 sql += ' WHERE gml_id IN (:uids)' 

84 sql = sa.text(sql).bindparams(uids=uids) 

85 else: 

86 sql = sa.text(sql) 

87 

88 with self.db.connect() as conn: 

89 for row in conn.execute(sql, execution_options={'stream_results': True}): 

90 r = gws.u.to_dict(row) 

91 o = self.as_struct(cls, '', r) 

92 o.identifikator = r.get('gml_id') 

93 o.geom = r.get('wkb_geometry', '') 

94 yield o 

95 

96 ## 

97 

98 def as_struct(self, cls, prop, r): 

99 d = {} 

100 

101 for attr_name, attr_low_name, attr_cls, fn in self.readers[cls.__name__].values(): 

102 comp_key = prop.lower() + '_' + attr_low_name 

103 if comp_key in r: 

104 d[attr_name] = fn(attr_cls, comp_key, r) 

105 else: 

106 d[attr_name] = fn(attr_cls, attr_low_name, r) 

107 

108 o = cls() 

109 vars(o).update(d) 

110 return o 

111 

112 def as_struct_list(self, cls, prop, r): 

113 d = {} 

114 

115 for attr_name, attr_low_name, attr_cls, fn in self.readers[cls.__name__].values(): 

116 comp_key = prop.lower() + '_' + attr_low_name 

117 if comp_key in r: 

118 a = _array(r[comp_key]) 

119 else: 

120 a = _array(r.get(attr_low_name)) 

121 if a: 

122 d[attr_low_name] = a 

123 

124 objs = [] 

125 

126 for vals in zip(*d.values()): 

127 r2 = dict(zip(d, vals)) 

128 o = self.as_struct(cls, '', r2) 

129 objs.append(o) 

130 

131 return objs 

132 

133 def as_ref(self, cls, prop, r): 

134 return r.get(prop) 

135 

136 def as_ref_list(self, cls, prop, r): 

137 return _array(r.get(prop)) 

138 

139 def as_enum(self, cls, prop, r): 

140 v = str(r.get(prop)) 

141 if v in cls.VALUES: 

142 return dt.EnumPair(v, cls.VALUES[v]) 

143 

144 def as_enum_list(self, cls, prop, r): 

145 ls = [] 

146 

147 for v in _array(r.get(prop)): 

148 v = str(v) 

149 if v in cls.VALUES: 

150 ls.append(dt.EnumPair(v, cls.VALUES[v])) 

151 

152 return ls 

153 

154 ## 

155 

156 def as_ax_lagebezeichnung(self, cls, prop, r): 

157 if r.get('unverschluesselt'): 

158 return r.get('unverschluesselt') 

159 return self.as_struct(gid.AX_VerschluesselteLagebezeichnung, prop, r) 

160 

161 def as_ax_buchung_historischesflurstueck_list(self, cls, prop, r): 

162 # this one is stored as 

163 # "blattart": [xxxx], 

164 # "buchungsart": ["xxxx"], 

165 # ....etc 

166 # "buchungsblattbezirk_bezirk": ["xxxx"], 

167 # "buchungsblattbezirk_land": ["xx"], 

168 # "buchungsblattkennzeichen": ["xxxx"], 

169 # "buchungsblattnummermitbuchstabenerweiterung": ["xxxx"], 

170 

171 bbs = self.as_struct_list(gid.AX_Buchung_HistorischesFlurstueck, '', r) 

172 bzs = self.as_struct_list(gid.AX_Buchungsblattbezirk_Schluessel, 'buchungsblattbezirk', r) 

173 objs = [] 

174 for bb, bz in zip(bbs, bzs): 

175 bb.buchungsblattbezirk = bz 

176 objs.append(bb) 

177 return objs 

178 

179 ## 

180 

181 def as_str(self, cls, prop, r): 

182 return _str(r.get(prop)) 

183 

184 def as_str_list(self, cls, prop, r): 

185 return [_str(v) for v in _array(r.get(prop))] 

186 

187 def as_bool(self, cls, prop, r): 

188 return _bool(r.get(prop)) 

189 

190 def as_bool_list(self, cls, prop, r): 

191 return [_bool(v) for v in _array(r.get(prop))] 

192 

193 def as_int(self, cls, prop, r): 

194 return _int(r.get(prop)) 

195 

196 def as_int_list(self, cls, prop, r): 

197 return [_int(v) for v in _array(r.get(prop))] 

198 

199 def as_float(self, cls, prop, r): 

200 return _float(r.get(prop)) 

201 

202 def as_float_list(self, cls, prop, r): 

203 return [_float(v) for v in _array(r.get(prop))] 

204 

205 def as_date(self, cls, prop, r): 

206 return _datetime(r.get(prop)) 

207 

208 def as_date_list(self, cls, prop, r): 

209 return [_datetime(v) for v in _array(r.get(prop))] 

210 

211 

212def _str(v): 

213 if v is not None: 

214 v = str(v).strip() 

215 if v: 

216 return v 

217 

218 

219def _bool(v): 

220 return bool(v) if v is not None else None 

221 

222 

223def _int(v): 

224 return int(v) if v is not None else None 

225 

226 

227def _float(v): 

228 return float(v) if v is not None else None 

229 

230 

231def _datetime(v): 

232 if not v: 

233 return None 

234 if isinstance(v, str): 

235 return gws.lib.datetimex.from_iso_string(v) 

236 return v 

237 

238 

239def _array(val): 

240 if isinstance(val, list): 

241 return val 

242 if val is None: 

243 return [] 

244 return [val]