Coverage for gws-app/gws/base/model/related_field.py: 0%
158 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Generic related field."""
3from typing import Optional, Iterable, Any, cast
5import gws
6import gws.lib.sa as sa
8from . import field
11class Config(field.Config):
12 pass
15class Props(field.Props):
16 pass
19class Link(gws.Data):
20 table: sa.Table
21 fromKey: sa.Column
22 toKey: sa.Column
25class RelRef(gws.Data):
26 model: gws.DatabaseModel
27 table: sa.Table
28 key: sa.Column
29 uid: sa.Column
32class Relationship(gws.Data):
33 src: RelRef
34 to: RelRef
35 tos: list[RelRef]
36 link: Link
37 deleteCascade: bool = False
40class Object(field.Object):
41 model: gws.DatabaseModel
42 rel: Relationship
44 def __getstate__(self):
45 return gws.u.omit(vars(self), 'rel')
47 def post_configure(self):
48 self.configure_relationship()
50 def activate(self):
51 self.configure_relationship()
53 def configure_relationship(self):
54 pass
56 def configure_widget(self):
57 if not super().configure_widget():
58 if self.attributeType == gws.AttributeType.feature:
59 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='featureSelect')
60 return True
61 if self.attributeType == gws.AttributeType.featurelist:
62 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='featureList')
63 return True
65 def get_model(self, uid: str) -> gws.DatabaseModel:
66 mod = self.root.get(uid)
67 if not mod:
68 raise gws.ConfigurationError(f'model {uid!r} not found')
69 return cast(gws.DatabaseModel, mod)
71 def find_relatable_features(self, search, mc):
72 return [
73 f
74 for to in self.rel.tos
75 for f in to.model.find_features(search, mc)
76 ]
78 def related_field(self, to: RelRef) -> Optional[gws.ModelField]:
79 for fld in to.model.fields:
80 rel2 = cast(Relationship, getattr(fld, 'rel', None))
81 if not rel2:
82 continue
83 if rel2.src.model == to.model and rel2.src.key == to.key:
84 return fld
86 def do_init_related(self, to_feature, mc):
87 our_features = [f for f in to_feature.createWithFeatures if f.model == self.model]
88 if not our_features:
89 return
91 for to in self.rel.tos:
92 if to.model == to_feature.model:
93 fld = self.related_field(to)
94 if fld:
95 if fld.attributeType == gws.AttributeType.feature:
96 to_feature.attributes[fld.name] = our_features[0]
97 if fld.attributeType == gws.AttributeType.featurelist:
98 to_feature.attributes.setdefault(fld.name, []).extend(our_features)
100 def related_models(self):
101 return [to.model for to in self.rel.tos]
103 ##
105 def to_props(self, feature, mc):
106 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth:
107 return
109 value = feature.get(self.name)
110 if not value:
111 return
112 if not isinstance(value, list):
113 value = [value]
115 mc2 = gws.base.model.secondary_context(mc)
116 res = []
118 for v in value:
119 related = cast(gws.Feature, v)
120 if related:
121 p = related.model.feature_to_props(related, mc2)
122 if p:
123 res.append(p)
125 if self.attributeType == gws.AttributeType.featurelist:
126 feature.props.attributes[self.name] = res
127 elif res:
128 feature.props.attributes[self.name] = res[0]
130 def from_props(self, feature, mc):
131 if mc.relDepth >= mc.maxDepth:
132 return
134 value = feature.props.attributes.get(self.name)
135 if not value:
136 return
137 if not isinstance(value, list):
138 value = [value]
140 mc2 = gws.base.model.secondary_context(mc)
141 res = []
142 to_model_map: dict[str, gws.Model] = {to.model.uid: to.model for to in self.rel.tos}
144 for v in value:
145 rel_props = cast(gws.FeatureProps, gws.u.to_data_object(v))
146 if rel_props:
147 to_model = to_model_map.get(rel_props.modelUid)
148 if to_model:
149 related = to_model.feature_from_props(rel_props, mc2)
150 if related:
151 res.append(related)
153 if self.attributeType == gws.AttributeType.featurelist:
154 feature.set(self.name, res)
155 elif res:
156 feature.set(self.name, res[0])
158 ##
160 def key_for_uid(
161 self,
162 model: gws.DatabaseModel,
163 key_column: sa.Column,
164 uid: gws.FeatureUid,
165 mc: gws.ModelContext
166 ):
167 sql = sa.select(key_column).where(model.uid_column().__eq__(uid))
168 with model.db.connect() as conn:
169 rs = list(conn.execute(sql))
170 return rs[0][0] if rs else None
172 def update_key_for_uids(
173 self,
174 model: gws.DatabaseModel,
175 key_column: sa.Column,
176 uids: list[gws.FeatureUid],
177 key: Any,
178 mc: gws.ModelContext
179 ):
181 sql = sa.update(
182 model.table()
183 ).values({
184 key_column: key
185 }).where(
186 model.uid_column().in_(uids)
187 )
188 with model.db.connect() as conn:
189 conn.execute(sql)
191 def uids_to_keys(
192 self,
193 mc: gws.ModelContext,
194 model: gws.DatabaseModel,
195 key_column: sa.Column,
196 uids: Optional[Iterable[gws.FeatureUid]] = None,
197 keys: Optional[Iterable[Any]] = None,
198 ):
200 if uids:
201 uids = set(v for v in uids if v is not None)
202 sql = sa.select(model.uid_column(), key_column).where(model.uid_column().in_(uids))
203 else:
204 keys = set(v for v in keys if v is not None)
205 sql = sa.select(model.uid_column(), key_column).where(key_column.in_(keys))
207 with model.db.connect() as conn:
208 return {str(uid): key for uid, key in conn.execute(sql)}
210 def uid_and_key_for_uids(
211 self,
212 model: gws.DatabaseModel,
213 key_column: sa.Column,
214 uids: Iterable[gws.FeatureUid],
215 mc: gws.ModelContext
216 ) -> set[tuple[gws.FeatureUid, gws.FeatureUid]]:
218 vs = set(v for v in uids if v is not None)
219 sql = sa.select(model.uid_column(), key_column).where(model.uid_column().in_(vs))
220 with model.db.connect() as conn:
221 return set((uid, key) for uid, key in conn.execute(sql))
223 def uid_and_key_for_keys(
224 self,
225 model: gws.DatabaseModel,
226 key_column: sa.Column,
227 keys: Iterable[gws.FeatureUid],
228 mc: gws.ModelContext
229 ) -> set[tuple[gws.FeatureUid, gws.FeatureUid]]:
231 vs = set(v for v in keys if v is not None)
232 sql = sa.select(model.uid_column(), key_column).where(key_column.in_(vs))
233 with model.db.connect() as conn:
234 return set((uid, key) for uid, key in conn.execute(sql))
236 def update_uid_and_key(
237 self,
238 model: gws.DatabaseModel,
239 key_column: sa.Column,
240 uid_and_key: Iterable[tuple[gws.FeatureUid, gws.FeatureUid]],
241 mc: gws.ModelContext
242 ):
244 with model.db.connect() as conn:
245 for uid, key in uid_and_key:
246 sql = sa.update(
247 model.table()
248 ).values({
249 key_column: key
250 }).where(
251 model.uid_column().__eq__(uid)
252 )
253 conn.execute(sql)
255 def drop_uid_and_key(
256 self,
257 model: gws.DatabaseModel,
258 key_column: sa.Column,
259 uids: Iterable[gws.FeatureUid],
260 delete: bool,
261 mc: gws.ModelContext,
262 ):
264 vs = set(v for v in uids if v is not None)
265 if not vs:
266 return
268 if delete:
269 sql = sa.delete(
270 model.table()
271 ).where(
272 model.uid_column().in_(vs)
273 )
274 else:
275 sql = sa.update(
276 model.table()
277 ).values({
278 key_column.name: None
279 }).where(
280 model.uid_column().in_(vs)
281 )
283 with model.db.connect() as conn:
284 conn.execute(sql)
286 def get_related(
287 self,
288 model: gws.DatabaseModel,
289 uids: Iterable[gws.FeatureUid],
290 mc: gws.ModelContext
291 ) -> list[gws.Feature]:
292 return model.get_features(uids, gws.base.model.secondary_context(mc))
294 def column_or_uid(self, model, cfg):
295 return model.column(cfg) if cfg else model.uid_column()