Coverage for gws-app/gws/base/model/related_field.py: 0%

158 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Generic related field.""" 

2 

3from typing import Optional, Iterable, Any, cast 

4 

5import gws 

6import gws.lib.sa as sa 

7 

8from . import field 

9 

10 

11class Config(field.Config): 

12 pass 

13 

14 

15class Props(field.Props): 

16 pass 

17 

18 

19class Link(gws.Data): 

20 table: sa.Table 

21 fromKey: sa.Column 

22 toKey: sa.Column 

23 

24 

25class RelRef(gws.Data): 

26 model: gws.DatabaseModel 

27 table: sa.Table 

28 key: sa.Column 

29 uid: sa.Column 

30 

31 

32class Relationship(gws.Data): 

33 src: RelRef 

34 to: RelRef 

35 tos: list[RelRef] 

36 link: Link 

37 deleteCascade: bool = False 

38 

39 

40class Object(field.Object): 

41 model: gws.DatabaseModel 

42 rel: Relationship 

43 

44 def __getstate__(self): 

45 return gws.u.omit(vars(self), 'rel') 

46 

47 def post_configure(self): 

48 self.configure_relationship() 

49 

50 def activate(self): 

51 self.configure_relationship() 

52 

53 def configure_relationship(self): 

54 pass 

55 

56 def configure_widget(self): 

57 if not super().configure_widget(): 

58 if self.attributeType == gws.AttributeType.feature: 

59 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='featureSelect') 

60 return True 

61 if self.attributeType == gws.AttributeType.featurelist: 

62 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='featureList') 

63 return True 

64 

65 def get_model(self, uid: str) -> gws.DatabaseModel: 

66 mod = self.root.get(uid) 

67 if not mod: 

68 raise gws.ConfigurationError(f'model {uid!r} not found') 

69 return cast(gws.DatabaseModel, mod) 

70 

71 def find_relatable_features(self, search, mc): 

72 return [ 

73 f 

74 for to in self.rel.tos 

75 for f in to.model.find_features(search, mc) 

76 ] 

77 

78 def related_field(self, to: RelRef) -> Optional[gws.ModelField]: 

79 for fld in to.model.fields: 

80 rel2 = cast(Relationship, getattr(fld, 'rel', None)) 

81 if not rel2: 

82 continue 

83 if rel2.src.model == to.model and rel2.src.key == to.key: 

84 return fld 

85 

86 def do_init_related(self, to_feature, mc): 

87 our_features = [f for f in to_feature.createWithFeatures if f.model == self.model] 

88 if not our_features: 

89 return 

90 

91 for to in self.rel.tos: 

92 if to.model == to_feature.model: 

93 fld = self.related_field(to) 

94 if fld: 

95 if fld.attributeType == gws.AttributeType.feature: 

96 to_feature.attributes[fld.name] = our_features[0] 

97 if fld.attributeType == gws.AttributeType.featurelist: 

98 to_feature.attributes.setdefault(fld.name, []).extend(our_features) 

99 

100 def related_models(self): 

101 return [to.model for to in self.rel.tos] 

102 

103 ## 

104 

105 def to_props(self, feature, mc): 

106 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth: 

107 return 

108 

109 value = feature.get(self.name) 

110 if not value: 

111 return 

112 if not isinstance(value, list): 

113 value = [value] 

114 

115 mc2 = gws.base.model.secondary_context(mc) 

116 res = [] 

117 

118 for v in value: 

119 related = cast(gws.Feature, v) 

120 if related: 

121 p = related.model.feature_to_props(related, mc2) 

122 if p: 

123 res.append(p) 

124 

125 if self.attributeType == gws.AttributeType.featurelist: 

126 feature.props.attributes[self.name] = res 

127 elif res: 

128 feature.props.attributes[self.name] = res[0] 

129 

130 def from_props(self, feature, mc): 

131 if mc.relDepth >= mc.maxDepth: 

132 return 

133 

134 value = feature.props.attributes.get(self.name) 

135 if not value: 

136 return 

137 if not isinstance(value, list): 

138 value = [value] 

139 

140 mc2 = gws.base.model.secondary_context(mc) 

141 res = [] 

142 to_model_map: dict[str, gws.Model] = {to.model.uid: to.model for to in self.rel.tos} 

143 

144 for v in value: 

145 rel_props = cast(gws.FeatureProps, gws.u.to_data_object(v)) 

146 if rel_props: 

147 to_model = to_model_map.get(rel_props.modelUid) 

148 if to_model: 

149 related = to_model.feature_from_props(rel_props, mc2) 

150 if related: 

151 res.append(related) 

152 

153 if self.attributeType == gws.AttributeType.featurelist: 

154 feature.set(self.name, res) 

155 elif res: 

156 feature.set(self.name, res[0]) 

157 

158 ## 

159 

160 def key_for_uid( 

161 self, 

162 model: gws.DatabaseModel, 

163 key_column: sa.Column, 

164 uid: gws.FeatureUid, 

165 mc: gws.ModelContext 

166 ): 

167 sql = sa.select(key_column).where(model.uid_column().__eq__(uid)) 

168 with model.db.connect() as conn: 

169 rs = list(conn.execute(sql)) 

170 return rs[0][0] if rs else None 

171 

172 def update_key_for_uids( 

173 self, 

174 model: gws.DatabaseModel, 

175 key_column: sa.Column, 

176 uids: list[gws.FeatureUid], 

177 key: Any, 

178 mc: gws.ModelContext 

179 ): 

180 

181 sql = sa.update( 

182 model.table() 

183 ).values({ 

184 key_column: key 

185 }).where( 

186 model.uid_column().in_(uids) 

187 ) 

188 with model.db.connect() as conn: 

189 conn.execute(sql) 

190 

191 def uids_to_keys( 

192 self, 

193 mc: gws.ModelContext, 

194 model: gws.DatabaseModel, 

195 key_column: sa.Column, 

196 uids: Optional[Iterable[gws.FeatureUid]] = None, 

197 keys: Optional[Iterable[Any]] = None, 

198 ): 

199 

200 if uids: 

201 uids = set(v for v in uids if v is not None) 

202 sql = sa.select(model.uid_column(), key_column).where(model.uid_column().in_(uids)) 

203 else: 

204 keys = set(v for v in keys if v is not None) 

205 sql = sa.select(model.uid_column(), key_column).where(key_column.in_(keys)) 

206 

207 with model.db.connect() as conn: 

208 return {str(uid): key for uid, key in conn.execute(sql)} 

209 

210 def uid_and_key_for_uids( 

211 self, 

212 model: gws.DatabaseModel, 

213 key_column: sa.Column, 

214 uids: Iterable[gws.FeatureUid], 

215 mc: gws.ModelContext 

216 ) -> set[tuple[gws.FeatureUid, gws.FeatureUid]]: 

217 

218 vs = set(v for v in uids if v is not None) 

219 sql = sa.select(model.uid_column(), key_column).where(model.uid_column().in_(vs)) 

220 with model.db.connect() as conn: 

221 return set((uid, key) for uid, key in conn.execute(sql)) 

222 

223 def uid_and_key_for_keys( 

224 self, 

225 model: gws.DatabaseModel, 

226 key_column: sa.Column, 

227 keys: Iterable[gws.FeatureUid], 

228 mc: gws.ModelContext 

229 ) -> set[tuple[gws.FeatureUid, gws.FeatureUid]]: 

230 

231 vs = set(v for v in keys if v is not None) 

232 sql = sa.select(model.uid_column(), key_column).where(key_column.in_(vs)) 

233 with model.db.connect() as conn: 

234 return set((uid, key) for uid, key in conn.execute(sql)) 

235 

236 def update_uid_and_key( 

237 self, 

238 model: gws.DatabaseModel, 

239 key_column: sa.Column, 

240 uid_and_key: Iterable[tuple[gws.FeatureUid, gws.FeatureUid]], 

241 mc: gws.ModelContext 

242 ): 

243 

244 with model.db.connect() as conn: 

245 for uid, key in uid_and_key: 

246 sql = sa.update( 

247 model.table() 

248 ).values({ 

249 key_column: key 

250 }).where( 

251 model.uid_column().__eq__(uid) 

252 ) 

253 conn.execute(sql) 

254 

255 def drop_uid_and_key( 

256 self, 

257 model: gws.DatabaseModel, 

258 key_column: sa.Column, 

259 uids: Iterable[gws.FeatureUid], 

260 delete: bool, 

261 mc: gws.ModelContext, 

262 ): 

263 

264 vs = set(v for v in uids if v is not None) 

265 if not vs: 

266 return 

267 

268 if delete: 

269 sql = sa.delete( 

270 model.table() 

271 ).where( 

272 model.uid_column().in_(vs) 

273 ) 

274 else: 

275 sql = sa.update( 

276 model.table() 

277 ).values({ 

278 key_column.name: None 

279 }).where( 

280 model.uid_column().in_(vs) 

281 ) 

282 

283 with model.db.connect() as conn: 

284 conn.execute(sql) 

285 

286 def get_related( 

287 self, 

288 model: gws.DatabaseModel, 

289 uids: Iterable[gws.FeatureUid], 

290 mc: gws.ModelContext 

291 ) -> list[gws.Feature]: 

292 return model.get_features(uids, gws.base.model.secondary_context(mc)) 

293 

294 def column_or_uid(self, model, cfg): 

295 return model.column(cfg) if cfg else model.uid_column()