Coverage for gws-app/gws/plugin/model_field/related_multi_feature_list/__init__.py: 0%
93 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Related Multi Feature List field
3Represents a 1:M relationship betweens a "parent" and multiple "child" tables ::
5 +---------+ +------------+
6 | parent | | child 1 |
7 +---------+ +------------+
8 | key |-------<<| parent_key |
9 | | +------------+
10 | |
11 | | +------------+
12 | | | child 2 |
13 | | +------------+
14 | |-------<<| parent_key |
15 | | +------------+
16 | |
17 | | +------------+
18 | | | child 3 |
19 | | +------------+
20 | |-------<<| parent_key |
21 +---------+ +------------+
25"""
27import gws
28import gws.base.database.model
29import gws.base.model
30import gws.base.model.related_field as related_field
31import gws.lib.sa as sa
33gws.ext.new.modelField('relatedMultiFeatureList')
36class RelatedItem(gws.Data):
37 toModel: str
38 toColumn: str
41class Config(related_field.Config):
42 fromColumn: str = ''
43 """key column in this table, primary key by default"""
44 related: list[RelatedItem]
45 """related models and keys"""
48class Props(related_field.Props):
49 pass
52class Object(related_field.Object):
53 attributeType = gws.AttributeType.featurelist
55 def configure_relationship(self):
56 self.rel = related_field.Relationship(
57 src=related_field.RelRef(
58 model=self.model,
59 table=self.model.table(),
60 key=self.column_or_uid(self.model, self.cfg('fromColumn')),
61 uid=self.model.uid_column(),
62 ),
63 tos=[]
64 )
66 for c in self.cfg('related'):
67 to_mod = self.get_model(c.toModel)
68 self.rel.tos.append(related_field.RelRef(
69 model=to_mod,
70 table=to_mod.table(),
71 key=to_mod.column(c.toColumn),
72 uid=to_mod.uid_column(),
73 ))
75 ##
77 def before_create_related(self, to_feature, mc):
78 for feature in to_feature.createWithFeatures:
79 if feature.model == self.model:
80 key = self.key_for_uid(self.rel.src.model, self.rel.src.key, feature.uid(), mc)
81 for to in self.rel.tos:
82 if to_feature.model == to.model:
83 to_feature.record.attributes[to.key.name] = key
84 return
86 def after_select(self, features, mc):
87 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth:
88 return
90 for f in features:
91 f.set(self.name, [])
93 uid_to_f = {f.uid(): f for f in features}
95 for to in self.rel.tos:
96 sql = sa.select(
97 to.uid,
98 self.rel.src.uid,
99 ).select_from(
100 to.table.join(
101 self.rel.src.table, self.rel.src.key.__eq__(to.key)
102 )
103 ).where(
104 self.rel.src.uid.in_(uid_to_f)
105 )
107 r_to_uids = {}
108 with self.model.db.connect() as conn:
109 for r, u in conn.execute(sql):
110 r_to_uids.setdefault(str(r), []).append(str(u))
112 for to_feature in to.model.get_features(r_to_uids, gws.base.model.secondary_context(mc)):
113 for uid in r_to_uids.get(to_feature.uid(), []):
114 feature = uid_to_f.get(uid)
115 feature.get(self.name).append(to_feature)
117 def after_create(self, feature, mc):
118 key = self.key_for_uid(self.model, self.rel.src.key, feature.insertedPrimaryKey, mc)
119 self.after_write(feature, key, mc)
121 def after_update(self, feature, mc):
122 key = self.key_for_uid(self.model, self.rel.src.key, feature.uid(), mc)
123 self.after_write(feature, key, mc)
125 def after_write(self, feature: gws.Feature, key, mc: gws.ModelContext):
126 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
127 return
129 for to in self.rel.tos:
130 if not mc.user.can_edit(to.model):
131 continue
133 cur_uids = self.to_uids_for_key(to, key, mc)
135 new_uids = set(
136 to_feature.uid()
137 for to_feature in feature.get(self.name, [])
138 if to_feature.model == to.model
139 )
141 ins_uids = new_uids - cur_uids
142 if ins_uids:
143 sql = sa.update(to.table).values({to.key.name: key}).where(to.uid.in_(ins_uids))
144 with to.model.db.connect() as conn:
145 conn.execute(sql)
147 self.drop_links(to, cur_uids - new_uids, mc)
149 def before_delete(self, feature, mc):
150 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
151 return
153 key = self.key_for_uid(self.model, self.rel.src.key, feature.uid(), mc)
154 setattr(mc, f'_DELETED_KEY_{self.uid}', key)
156 def after_delete(self, features, mc):
157 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth:
158 return
160 key = getattr(mc, f'_DELETED_KEY_{self.uid}')
162 for to in self.rel.tos:
163 if not mc.user.can_edit(to.model):
164 continue
165 cur_uids = self.to_uids_for_key(to, key, mc)
166 self.drop_links(to, cur_uids, mc)
168 def to_uids_for_key(self, to: related_field.RelRef, key, mc):
169 sql = sa.select(to.uid).where(to.key.__eq__(key))
170 with to.model.db.connect() as conn:
171 return set(str(u[0]) for u in conn.execute(sql))
173 def drop_links(self, to: related_field.RelRef, to_uids, mc):
174 if not to_uids:
175 return
176 if self.rel.deleteCascade:
177 sql = sa.delete(to.table).where(to.uid.in_(to_uids))
178 else:
179 sql = sa.update(to.table).values({to.key.name: None}).where(to.uid.in_(to_uids))
180 with to.model.db.connect() as conn:
181 conn.execute(sql)