Coverage for gws-app/gws/plugin/model_field/related_multi_feature_list/__init__.py: 0%

93 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Related Multi Feature List field 

2 

3Represents a 1:M relationship betweens a "parent" and multiple "child" tables :: 

4 

5 +---------+ +------------+ 

6 | parent | | child 1 | 

7 +---------+ +------------+ 

8 | key |-------<<| parent_key | 

9 | | +------------+ 

10 | | 

11 | | +------------+ 

12 | | | child 2 | 

13 | | +------------+ 

14 | |-------<<| parent_key | 

15 | | +------------+ 

16 | | 

17 | | +------------+ 

18 | | | child 3 | 

19 | | +------------+ 

20 | |-------<<| parent_key | 

21 +---------+ +------------+ 

22 

23 

24 

25""" 

26 

27import gws 

28import gws.base.database.model 

29import gws.base.model 

30import gws.base.model.related_field as related_field 

31import gws.lib.sa as sa 

32 

33gws.ext.new.modelField('relatedMultiFeatureList') 

34 

35 

36class RelatedItem(gws.Data): 

37 toModel: str 

38 toColumn: str 

39 

40 

41class Config(related_field.Config): 

42 fromColumn: str = '' 

43 """key column in this table, primary key by default""" 

44 related: list[RelatedItem] 

45 """related models and keys""" 

46 

47 

48class Props(related_field.Props): 

49 pass 

50 

51 

52class Object(related_field.Object): 

53 attributeType = gws.AttributeType.featurelist 

54 

55 def configure_relationship(self): 

56 self.rel = related_field.Relationship( 

57 src=related_field.RelRef( 

58 model=self.model, 

59 table=self.model.table(), 

60 key=self.column_or_uid(self.model, self.cfg('fromColumn')), 

61 uid=self.model.uid_column(), 

62 ), 

63 tos=[] 

64 ) 

65 

66 for c in self.cfg('related'): 

67 to_mod = self.get_model(c.toModel) 

68 self.rel.tos.append(related_field.RelRef( 

69 model=to_mod, 

70 table=to_mod.table(), 

71 key=to_mod.column(c.toColumn), 

72 uid=to_mod.uid_column(), 

73 )) 

74 

75 ## 

76 

77 def before_create_related(self, to_feature, mc): 

78 for feature in to_feature.createWithFeatures: 

79 if feature.model == self.model: 

80 key = self.key_for_uid(self.rel.src.model, self.rel.src.key, feature.uid(), mc) 

81 for to in self.rel.tos: 

82 if to_feature.model == to.model: 

83 to_feature.record.attributes[to.key.name] = key 

84 return 

85 

86 def after_select(self, features, mc): 

87 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth: 

88 return 

89 

90 for f in features: 

91 f.set(self.name, []) 

92 

93 uid_to_f = {f.uid(): f for f in features} 

94 

95 for to in self.rel.tos: 

96 sql = sa.select( 

97 to.uid, 

98 self.rel.src.uid, 

99 ).select_from( 

100 to.table.join( 

101 self.rel.src.table, self.rel.src.key.__eq__(to.key) 

102 ) 

103 ).where( 

104 self.rel.src.uid.in_(uid_to_f) 

105 ) 

106 

107 r_to_uids = {} 

108 with self.model.db.connect() as conn: 

109 for r, u in conn.execute(sql): 

110 r_to_uids.setdefault(str(r), []).append(str(u)) 

111 

112 for to_feature in to.model.get_features(r_to_uids, gws.base.model.secondary_context(mc)): 

113 for uid in r_to_uids.get(to_feature.uid(), []): 

114 feature = uid_to_f.get(uid) 

115 feature.get(self.name).append(to_feature) 

116 

117 def after_create(self, feature, mc): 

118 key = self.key_for_uid(self.model, self.rel.src.key, feature.insertedPrimaryKey, mc) 

119 self.after_write(feature, key, mc) 

120 

121 def after_update(self, feature, mc): 

122 key = self.key_for_uid(self.model, self.rel.src.key, feature.uid(), mc) 

123 self.after_write(feature, key, mc) 

124 

125 def after_write(self, feature: gws.Feature, key, mc: gws.ModelContext): 

126 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

127 return 

128 

129 for to in self.rel.tos: 

130 if not mc.user.can_edit(to.model): 

131 continue 

132 

133 cur_uids = self.to_uids_for_key(to, key, mc) 

134 

135 new_uids = set( 

136 to_feature.uid() 

137 for to_feature in feature.get(self.name, []) 

138 if to_feature.model == to.model 

139 ) 

140 

141 ins_uids = new_uids - cur_uids 

142 if ins_uids: 

143 sql = sa.update(to.table).values({to.key.name: key}).where(to.uid.in_(ins_uids)) 

144 with to.model.db.connect() as conn: 

145 conn.execute(sql) 

146 

147 self.drop_links(to, cur_uids - new_uids, mc) 

148 

149 def before_delete(self, feature, mc): 

150 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

151 return 

152 

153 key = self.key_for_uid(self.model, self.rel.src.key, feature.uid(), mc) 

154 setattr(mc, f'_DELETED_KEY_{self.uid}', key) 

155 

156 def after_delete(self, features, mc): 

157 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

158 return 

159 

160 key = getattr(mc, f'_DELETED_KEY_{self.uid}') 

161 

162 for to in self.rel.tos: 

163 if not mc.user.can_edit(to.model): 

164 continue 

165 cur_uids = self.to_uids_for_key(to, key, mc) 

166 self.drop_links(to, cur_uids, mc) 

167 

168 def to_uids_for_key(self, to: related_field.RelRef, key, mc): 

169 sql = sa.select(to.uid).where(to.key.__eq__(key)) 

170 with to.model.db.connect() as conn: 

171 return set(str(u[0]) for u in conn.execute(sql)) 

172 

173 def drop_links(self, to: related_field.RelRef, to_uids, mc): 

174 if not to_uids: 

175 return 

176 if self.rel.deleteCascade: 

177 sql = sa.delete(to.table).where(to.uid.in_(to_uids)) 

178 else: 

179 sql = sa.update(to.table).values({to.key.name: None}).where(to.uid.in_(to_uids)) 

180 with to.model.db.connect() as conn: 

181 conn.execute(sql)