Coverage for gws-app/gws/base/database/model.py: 0%

152 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Database-based models.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.feature 

7import gws.base.model 

8import gws.base.model.field 

9import gws.config.util 

10import gws.gis.crs 

11import gws.lib.sa as sa 

12 

13 

14class Props(gws.base.model.Props): 

15 pass 

16 

17 

18class Config(gws.base.model.Config): 

19 dbUid: Optional[str] 

20 """db provider uid""" 

21 tableName: Optional[str] 

22 """table name for the model""" 

23 sqlFilter: Optional[str] 

24 """extra SQL filter""" 

25 

26 

27class Object(gws.base.model.Object, gws.DatabaseModel): 

28 def configure(self): 

29 self.tableName = self.cfg('tableName') or self.cfg('_defaultTableName') 

30 if not self.tableName: 

31 raise gws.ConfigurationError(f'table name missing in model {self!r}') 

32 

33 self.sqlFilter = self.cfg('sqlFilter') 

34 self.configure_model() 

35 

36 def configure_provider(self): 

37 return gws.config.util.configure_database_provider_for(self) 

38 

39 ## 

40 

41 def describe(self): 

42 return self.db.describe(self.tableName) 

43 

44 def table(self): 

45 return self.db.table(self.tableName) 

46 

47 def column(self, column_name): 

48 return self.db.column(self.table(), column_name) 

49 

50 def uid_column(self): 

51 if not self.uidName: 

52 raise gws.Error(f'no primary key found for table {self.tableName!r}') 

53 if not self.db.has_column(self.table(), self.uidName): 

54 raise gws.Error(f'invalid primary key {self.uidName!r} for table {self.tableName!r}') 

55 return self.db.column(self.table(), self.uidName) 

56 

57 ## 

58 

59 def find_features(self, search, mc): 

60 if not mc.user.can_read(self): 

61 raise gws.ForbiddenError() 

62 

63 mc.search = search 

64 mc.dbSelect = gws.ModelDbSelect( 

65 columns=[], 

66 geometryWhere=[], 

67 keywordWhere=[], 

68 order=[], 

69 where=[] 

70 ) 

71 

72 with self.db.connect() as conn: 

73 for fld in self.fields: 

74 fld.before_select(mc) 

75 

76 sql = self._make_select(mc) 

77 if sql is None: 

78 gws.log.debug('empty select') 

79 return [] 

80 

81 features = [] 

82 

83 for row in conn.execute(sql): 

84 features.append(gws.base.feature.new( 

85 model=self, 

86 record=gws.FeatureRecord(attributes=gws.u.to_dict(row)) 

87 )) 

88 

89 for fld in self.fields: 

90 fld.after_select(features, mc) 

91 

92 return features 

93 

94 def _make_select(self, mc: gws.ModelContext) -> Optional[sa.Select]: 

95 

96 # @TODO this should happen on the field level 

97 sorts = mc.search.sort or self.defaultSort or [] 

98 for s in sorts: 

99 fn = sa.desc if s.reverse else sa.asc 

100 mc.dbSelect.order.append(fn(self.column(s.fieldName))) 

101 

102 sel = sa.select().select_from(self.table()) 

103 

104 if mc.search.uids: 

105 if not self.uidName: 

106 return 

107 sel = sel.where(self.uid_column().in_(mc.search.uids)) 

108 

109 if mc.search.keyword and not mc.dbSelect.keywordWhere: 

110 return 

111 if mc.dbSelect.keywordWhere: 

112 sel = sel.where(sa.or_(*mc.dbSelect.keywordWhere)) 

113 

114 if mc.search.shape and not mc.dbSelect.geometryWhere: 

115 return 

116 if mc.dbSelect.geometryWhere: 

117 sel = sel.where(sa.or_(*mc.dbSelect.geometryWhere)) 

118 

119 sel = sel.where(*mc.dbSelect.where) 

120 if mc.search.extraWhere: 

121 for w in mc.search.extraWhere: 

122 sel = sel.where(w) 

123 

124 if self.sqlFilter: 

125 sel = sel.where(sa.text('(' + self.sqlFilter + ')')) 

126 

127 cols = [] 

128 for col in mc.dbSelect.columns or []: 

129 if any(col is c for c in cols): 

130 continue 

131 cols.append(col) 

132 for col in mc.search.extraColumns or []: 

133 if any(col is c for c in cols): 

134 continue 

135 cols.append(col) 

136 

137 sel = sel.add_columns(*cols) 

138 

139 if mc.dbSelect.order: 

140 sel = sel.order_by(*mc.dbSelect.order) 

141 

142 return sel 

143 

144 ## 

145 

146 def init_feature(self, feature, mc): 

147 if not mc.user.can_create(self): 

148 raise gws.ForbiddenError() 

149 

150 for fld in self.fields: 

151 fld.do_init(feature, mc) 

152 

153 for rf in feature.createWithFeatures: 

154 for fld in rf.model.fields: 

155 fld.do_init_related(feature, mc) 

156 

157 def create_feature(self, feature, mc): 

158 if not mc.user.can_create(self): 

159 raise gws.ForbiddenError() 

160 

161 feature.record = gws.FeatureRecord(attributes={}, meta={}) 

162 

163 related_models = [] 

164 for from_feature in feature.createWithFeatures: 

165 if from_feature.model not in related_models: 

166 related_models.append(from_feature.model) 

167 

168 with self.db.connect() as conn: 

169 for m in related_models: 

170 for fld in m.fields: 

171 fld.before_create_related(feature, mc) 

172 

173 for fld in self.fields: 

174 fld.before_create(feature, mc) 

175 

176 sql = sa.insert(self.table()) 

177 rs = conn.execute(sql, feature.record.attributes) 

178 feature.insertedPrimaryKey = rs.inserted_primary_key[0] 

179 

180 for fld in self.fields: 

181 fld.after_create(feature, mc) 

182 

183 for m in related_models: 

184 for fld in m.fields: 

185 fld.after_create_related(feature, mc) 

186 

187 conn.commit() 

188 

189 return feature.insertedPrimaryKey 

190 

191 def update_feature(self, feature, mc): 

192 if not mc.user.can_write(self): 

193 raise gws.ForbiddenError() 

194 

195 feature.record = gws.FeatureRecord(attributes={}, meta={}) 

196 

197 with self.db.connect() as conn: 

198 for fld in self.fields: 

199 fld.before_update(feature, mc) 

200 

201 if not feature.record.attributes: 

202 return feature.uid() 

203 

204 sql = self.table().update().where( 

205 self.uid_column().__eq__(feature.uid()) 

206 ).values( 

207 feature.record.attributes 

208 ) 

209 conn.execute(sql) 

210 

211 for fld in self.fields: 

212 fld.after_update(feature, mc) 

213 

214 conn.commit() 

215 

216 return feature.uid() 

217 

218 def delete_feature(self, feature, mc): 

219 if not mc.user.can_delete(self): 

220 raise gws.ForbiddenError() 

221 

222 with self.db.connect() as conn: 

223 for fld in self.fields: 

224 fld.before_delete(feature, mc) 

225 

226 sql = sa.delete(self.table()).where( 

227 self.uid_column().__eq__(feature.uid()) 

228 ) 

229 

230 conn.execute(sql) 

231 

232 for fld in self.fields: 

233 fld.after_delete(feature, mc) 

234 

235 conn.commit() 

236 

237 return feature.uid()