Coverage for gws-app/gws/plugin/model_field/related_linked_feature_list/__init__.py: 0%

88 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Related Linked Feature List field 

2 

3Represents an M:N relationship between two models via a link table ("associative entity"):: 

4 

5 +---------+ +---------------+ +---------+ 

6 | table A | | link table | | table B | 

7 +---------+ +---------------+ +---------+ 

8 | key_a |-------<<| a b |>>-------| key_b | 

9 +---------+ +---------------+ +---------+ 

10 

11""" 

12 

13import gws 

14import gws.base.database.model 

15import gws.base.model 

16import gws.base.model.related_field as related_field 

17import gws.lib.sa as sa 

18 

19gws.ext.new.modelField('relatedLinkedFeatureList') 

20 

21 

22class Config(related_field.Config): 

23 fromColumn: str = '' 

24 """key column in this table, primary key by default""" 

25 toModel: str 

26 """related model""" 

27 toColumn: str = '' 

28 """key column in the related table, primary key by default""" 

29 linkTableName: str 

30 """link table name""" 

31 linkFromColumn: str 

32 """link key column for this model""" 

33 linkToColumn: str 

34 """link key column for the related model""" 

35 

36 

37class Props(related_field.Props): 

38 pass 

39 

40 

41class Object(related_field.Object): 

42 attributeType = gws.AttributeType.featurelist 

43 

44 def configure_relationship(self): 

45 to_mod = self.get_model(self.cfg('toModel')) 

46 link_tab = self.model.db.table(self.cfg('linkTableName')) 

47 

48 self.rel = related_field.Relationship( 

49 src=related_field.RelRef( 

50 model=self.model, 

51 table=self.model.table(), 

52 key=self.column_or_uid(self.model, self.cfg('fromColumn')), 

53 uid=self.model.uid_column(), 

54 ), 

55 tos=[ 

56 related_field.RelRef( 

57 model=to_mod, 

58 table=to_mod.table(), 

59 key=self.column_or_uid(to_mod, self.cfg('toColumn')), 

60 uid=to_mod.uid_column(), 

61 ) 

62 ], 

63 link=related_field.Link( 

64 table=link_tab, 

65 fromKey=self.model.db.column(link_tab, self.cfg('linkFromColumn')), 

66 toKey=self.model.db.column(link_tab, self.cfg('linkToColumn')), 

67 ) 

68 ) 

69 self.rel.to = self.rel.tos[0] 

70 

71 ## 

72 

73 def after_select(self, features, mc): 

74 if not mc.user.can_read(self) or mc.relDepth >= mc.maxDepth: 

75 return 

76 

77 for f in features: 

78 f.set(self.name, []) 

79 

80 uid_to_f = {f.uid(): f for f in features} 

81 

82 sql = sa.select( 

83 self.rel.to.uid, 

84 self.rel.src.uid, 

85 ).select_from( 

86 self.rel.to.table.join( 

87 self.rel.link.table, self.rel.link.toKey.__eq__(self.rel.to.key) 

88 ).join( 

89 self.rel.src.table, self.rel.link.fromKey.__eq__(self.rel.src.key) 

90 ) 

91 ).where( 

92 self.rel.src.uid.in_(uid_to_f) 

93 ) 

94 

95 r_to_uids = {} 

96 with self.model.db.connect() as conn: 

97 for r, u in conn.execute(sql): 

98 r_to_uids.setdefault(str(r), []).append(str(u)) 

99 

100 for to_feature in self.rel.to.model.get_features(r_to_uids, gws.base.model.secondary_context(mc)): 

101 for uid in r_to_uids.get(to_feature.uid(), []): 

102 feature = uid_to_f.get(uid) 

103 feature.get(self.name).append(to_feature) 

104 

105 def after_create_related(self, to_feature, mc): 

106 if to_feature.model != self.rel.to.model: 

107 return 

108 

109 right_key = self.key_for_uid(self.rel.to.model, self.rel.to.key, to_feature.insertedPrimaryKey, mc) 

110 new_links = set() 

111 

112 for feature in to_feature.createWithFeatures: 

113 if feature.model == self.model: 

114 key = self.key_for_uid(self.rel.src.model, self.rel.src.key, feature.uid(), mc) 

115 new_links.add((key, right_key)) 

116 

117 self.create_links(new_links, mc) 

118 

119 def after_create(self, feature, mc): 

120 key = self.key_for_uid(self.model, self.rel.src.key, feature.insertedPrimaryKey, mc) 

121 self.after_write(feature, key, mc) 

122 

123 def after_update(self, feature, mc): 

124 key = self.key_for_uid(self.model, self.rel.src.key, feature.uid(), mc) 

125 self.after_write(feature, key, mc) 

126 

127 def after_write(self, feature, key, mc: gws.ModelContext): 

128 if not mc.user.can_write(self) or mc.relDepth >= mc.maxDepth: 

129 return 

130 

131 cur_links = self.get_links([key], mc) 

132 to_uids = set( 

133 to_feature.uid() 

134 for to_feature in feature.get(self.name, []) 

135 ) 

136 

137 sql = sa.select(self.rel.to.uid, self.rel.to.key).where(self.rel.to.uid.in_(to_uids)) 

138 with self.rel.to.model.db.connect() as conn: 

139 r_uid_to_key = {str(u): k for u, k in conn.execute(sql)} 

140 

141 new_links = set() 

142 

143 for to_feature in feature.get(self.name, []): 

144 right_key = r_uid_to_key.get(to_feature.uid()) 

145 new_links.add((key, right_key)) 

146 

147 self.create_links(new_links - cur_links, mc) 

148 self.delete_links(cur_links - new_links, mc) 

149 

150 def get_links(self, left_keys, mc): 

151 sql = sa.select( 

152 self.rel.link.fromKey, 

153 self.rel.link.toKey, 

154 ).where( 

155 self.rel.link.fromKey.in_(left_keys) 

156 ) 

157 with self.model.db.connect() as conn: 

158 return set((lk, rk) for lk, rk in conn.execute(sql)) 

159 

160 def create_links(self, links, mc): 

161 sql = sa.insert(self.rel.link.table) 

162 values = [ 

163 { 

164 self.rel.link.fromKey.name: lk, 

165 self.rel.link.toKey.name: rk 

166 } 

167 for lk, rk in links 

168 ] 

169 if values: 

170 with self.model.db.connect() as conn: 

171 conn.execute(sql, values) 

172 

173 def delete_links(self, links, mc): 

174 with self.model.db.connect() as conn: 

175 for lk, rk in links: 

176 sql = sa.delete( 

177 self.rel.link.table 

178 ).where( 

179 self.rel.link.fromKey.__eq__(lk), 

180 self.rel.link.toKey.__eq__(rk) 

181 ) 

182 conn.execute(sql)