Coverage for gws-app/gws/base/database/model.py: 0%
152 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Database-based models."""
3from typing import Optional, cast
5import gws
6import gws.base.feature
7import gws.base.model
8import gws.base.model.field
9import gws.config.util
10import gws.gis.crs
11import gws.lib.sa as sa
14class Props(gws.base.model.Props):
15 pass
18class Config(gws.base.model.Config):
19 dbUid: Optional[str]
20 """db provider uid"""
21 tableName: Optional[str]
22 """table name for the model"""
23 sqlFilter: Optional[str]
24 """extra SQL filter"""
27class Object(gws.base.model.Object, gws.DatabaseModel):
28 def configure(self):
29 self.tableName = self.cfg('tableName') or self.cfg('_defaultTableName')
30 if not self.tableName:
31 raise gws.ConfigurationError(f'table name missing in model {self!r}')
33 self.sqlFilter = self.cfg('sqlFilter')
34 self.configure_model()
36 def configure_provider(self):
37 return gws.config.util.configure_database_provider_for(self)
39 ##
41 def describe(self):
42 return self.db.describe(self.tableName)
44 def table(self):
45 return self.db.table(self.tableName)
47 def column(self, column_name):
48 return self.db.column(self.table(), column_name)
50 def uid_column(self):
51 if not self.uidName:
52 raise gws.Error(f'no primary key found for table {self.tableName!r}')
53 if not self.db.has_column(self.table(), self.uidName):
54 raise gws.Error(f'invalid primary key {self.uidName!r} for table {self.tableName!r}')
55 return self.db.column(self.table(), self.uidName)
57 ##
59 def find_features(self, search, mc):
60 if not mc.user.can_read(self):
61 raise gws.ForbiddenError()
63 mc.search = search
64 mc.dbSelect = gws.ModelDbSelect(
65 columns=[],
66 geometryWhere=[],
67 keywordWhere=[],
68 order=[],
69 where=[]
70 )
72 with self.db.connect() as conn:
73 for fld in self.fields:
74 fld.before_select(mc)
76 sql = self._make_select(mc)
77 if sql is None:
78 gws.log.debug('empty select')
79 return []
81 features = []
83 for row in conn.execute(sql):
84 features.append(gws.base.feature.new(
85 model=self,
86 record=gws.FeatureRecord(attributes=gws.u.to_dict(row))
87 ))
89 for fld in self.fields:
90 fld.after_select(features, mc)
92 return features
94 def _make_select(self, mc: gws.ModelContext) -> Optional[sa.Select]:
96 # @TODO this should happen on the field level
97 sorts = mc.search.sort or self.defaultSort or []
98 for s in sorts:
99 fn = sa.desc if s.reverse else sa.asc
100 mc.dbSelect.order.append(fn(self.column(s.fieldName)))
102 sel = sa.select().select_from(self.table())
104 if mc.search.uids:
105 if not self.uidName:
106 return
107 sel = sel.where(self.uid_column().in_(mc.search.uids))
109 if mc.search.keyword and not mc.dbSelect.keywordWhere:
110 return
111 if mc.dbSelect.keywordWhere:
112 sel = sel.where(sa.or_(*mc.dbSelect.keywordWhere))
114 if mc.search.shape and not mc.dbSelect.geometryWhere:
115 return
116 if mc.dbSelect.geometryWhere:
117 sel = sel.where(sa.or_(*mc.dbSelect.geometryWhere))
119 sel = sel.where(*mc.dbSelect.where)
120 if mc.search.extraWhere:
121 for w in mc.search.extraWhere:
122 sel = sel.where(w)
124 if self.sqlFilter:
125 sel = sel.where(sa.text('(' + self.sqlFilter + ')'))
127 cols = []
128 for col in mc.dbSelect.columns or []:
129 if any(col is c for c in cols):
130 continue
131 cols.append(col)
132 for col in mc.search.extraColumns or []:
133 if any(col is c for c in cols):
134 continue
135 cols.append(col)
137 sel = sel.add_columns(*cols)
139 if mc.dbSelect.order:
140 sel = sel.order_by(*mc.dbSelect.order)
142 return sel
144 ##
146 def init_feature(self, feature, mc):
147 if not mc.user.can_create(self):
148 raise gws.ForbiddenError()
150 for fld in self.fields:
151 fld.do_init(feature, mc)
153 for rf in feature.createWithFeatures:
154 for fld in rf.model.fields:
155 fld.do_init_related(feature, mc)
157 def create_feature(self, feature, mc):
158 if not mc.user.can_create(self):
159 raise gws.ForbiddenError()
161 feature.record = gws.FeatureRecord(attributes={}, meta={})
163 related_models = []
164 for from_feature in feature.createWithFeatures:
165 if from_feature.model not in related_models:
166 related_models.append(from_feature.model)
168 with self.db.connect() as conn:
169 for m in related_models:
170 for fld in m.fields:
171 fld.before_create_related(feature, mc)
173 for fld in self.fields:
174 fld.before_create(feature, mc)
176 sql = sa.insert(self.table())
177 rs = conn.execute(sql, feature.record.attributes)
178 feature.insertedPrimaryKey = rs.inserted_primary_key[0]
180 for fld in self.fields:
181 fld.after_create(feature, mc)
183 for m in related_models:
184 for fld in m.fields:
185 fld.after_create_related(feature, mc)
187 conn.commit()
189 return feature.insertedPrimaryKey
191 def update_feature(self, feature, mc):
192 if not mc.user.can_write(self):
193 raise gws.ForbiddenError()
195 feature.record = gws.FeatureRecord(attributes={}, meta={})
197 with self.db.connect() as conn:
198 for fld in self.fields:
199 fld.before_update(feature, mc)
201 if not feature.record.attributes:
202 return feature.uid()
204 sql = self.table().update().where(
205 self.uid_column().__eq__(feature.uid())
206 ).values(
207 feature.record.attributes
208 )
209 conn.execute(sql)
211 for fld in self.fields:
212 fld.after_update(feature, mc)
214 conn.commit()
216 return feature.uid()
218 def delete_feature(self, feature, mc):
219 if not mc.user.can_delete(self):
220 raise gws.ForbiddenError()
222 with self.db.connect() as conn:
223 for fld in self.fields:
224 fld.before_delete(feature, mc)
226 sql = sa.delete(self.table()).where(
227 self.uid_column().__eq__(feature.uid())
228 )
230 conn.execute(sql)
232 for fld in self.fields:
233 fld.after_delete(feature, mc)
235 conn.commit()
237 return feature.uid()