Coverage for gws-app/gws/plugin/model_field/file/__init__.py: 0%
150 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""File field."""
3from typing import Optional, cast
5import gws
6import gws.base.database.model
7import gws.base.model.field
8import gws.lib.mime
9import gws.lib.osx
10import gws.lib.sa as sa
12gws.ext.new.modelField('file')
15class Config(gws.base.model.field.Config):
16 contentColumn: str = ''
17 pathColumn: str = ''
18 nameColumn: str = ''
21class Props(gws.base.model.field.Props):
22 pass
25class Cols(gws.Data):
26 content: Optional[sa.Column]
27 path: Optional[sa.Column]
28 name: Optional[sa.Column]
31class FileInputProps(gws.Data):
32 content: bytes
33 name: str
36class ServerFileProps(gws.Data):
37 downloadUrl: str
38 extension: str
39 label: str
40 previewUrl: str
41 size: int
44class ClientFileProps(gws.Data):
45 name: str
46 content: bytes
49class FileValue(gws.Data):
50 content: bytes
51 name: str
52 path: str
53 size: int
56class Object(gws.base.model.field.Object):
57 model: gws.DatabaseModel
59 attributeType = gws.AttributeType.file
60 cols: Cols
62 def __getstate__(self):
63 return gws.u.omit(vars(self), 'cols')
65 def post_configure(self):
66 self.configure_columns()
68 def activate(self):
69 self.configure_columns()
71 def configure_columns(self):
72 model = cast(gws.base.database.model.Object, self.model)
74 self.cols = Cols()
76 p = self.cfg('contentColumn')
77 self.cols.content = model.column(p) if p else None
79 p = self.cfg('pathColumn')
80 self.cols.path = model.column(p) if p else None
82 p = self.cfg('nameColumn')
83 self.cols.name = model.column(p) if p else None
85 if self.cols.content is None and self.cols.path is None:
86 raise gws.ConfigurationError('contentColumn or pathColumn must be set')
88 if not self.model.uidName:
89 raise gws.ConfigurationError('file fields require a primary key')
91 def configure_widget(self):
92 if not super().configure_widget():
93 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='file')
94 return True
96 ##
98 def before_select(self, mc):
99 mc.dbSelect.columns.extend(self.select_columns(False, mc))
101 def after_select(self, features, mc):
102 for feature in features:
103 self.from_record(feature, mc)
105 def before_create(self, feature, mc):
106 self.to_record(feature, mc)
108 def before_update(self, feature, mc):
109 self.to_record(feature, mc)
111 def from_record(self, feature, mc):
112 feature.set(self.name, self.load_value(feature.record.attributes, mc))
114 def to_record(self, feature, mc):
115 if not mc.user.can_write(self):
116 return
118 # @TODO store in the filesystem
120 fv = cast(FileValue, feature.get(self.name))
121 if fv:
122 if self.cols.content is not None:
123 feature.record.attributes[self.cols.content.name] = fv.content
124 if self.cols.name is not None:
125 feature.record.attributes[self.cols.name.name] = fv.name
127 # @TODO merge with scalar_field?
129 def from_props(self, feature, mc):
130 value = feature.props.attributes.get(self.name)
131 if value is not None:
132 value = self.prop_to_python(feature, value, mc)
133 if value is not None:
134 feature.set(self.name, value)
136 def to_props(self, feature, mc):
137 if not mc.user.can_read(self):
138 return
139 value = feature.get(self.name)
140 if value is not None:
141 value = self.python_to_prop(feature, value, mc)
142 if value is not None:
143 feature.props.attributes[self.name] = value
145 ##
147 def prop_to_python(self, feature, value, mc) -> FileValue:
148 try:
149 return FileValue(
150 content=gws.u.get(value, 'content'),
151 name=gws.u.get(value, 'name'),
152 )
153 except ValueError:
154 return gws.ErrorValue
156 def python_to_prop(self, feature, value, mc) -> ServerFileProps:
157 fv = cast(FileValue, value)
159 mime = self.get_mime_type(fv)
160 ext = gws.lib.mime.extension_for(mime)
162 p = ServerFileProps(
163 # @TODO use a template
164 label=fv.name or '',
165 extension=ext,
166 size=fv.size or 0,
167 previewUrl='',
168 downloadUrl='',
169 )
171 name = fv.name or f'gws.{ext}'
173 url_args = dict(
174 projectUid=mc.project.uid,
175 modelUid=self.model.uid,
176 fieldName=self.name,
177 featureUid=feature.uid(),
178 )
180 if mime.startswith('image'):
181 p.previewUrl = gws.u.action_url_path('webFile', preview=1, **url_args) + '/' + name
183 p.downloadUrl = gws.u.action_url_path('webFile', **url_args) + '/' + name
185 return p
187 ##
189 def get_mime_type(self, fv: FileValue) -> str:
190 if fv.path:
191 return gws.lib.mime.for_path(fv.path)
192 if fv.name:
193 return gws.lib.mime.for_path(fv.name)
194 # @TODO guess mime from content?
195 return gws.lib.mime.TXT
197 def handle_web_file_request(self, feature_uid: str, preview: bool, mc: gws.ModelContext) -> Optional[gws.ContentResponse]:
198 model = cast(gws.DatabaseModel, self.model)
200 sql = sa.select(
201 *self.select_columns(True, mc)
202 ).where(
203 model.uid_column().__eq__(feature_uid)
204 )
206 with self.model.db.connect() as conn:
207 rs = list(conn.execute(sql))
208 if not rs:
209 return
211 for row in rs:
212 fv = self.load_value(gws.u.to_dict(row), mc)
213 return gws.ContentResponse(
214 asAttachment=not preview,
215 attachmentName=fv.name,
216 content=fv.content,
217 mime=self.get_mime_type(fv),
218 )
220 ##
222 def select_columns(self, with_content, mc):
223 cs = []
225 if self.cols.content is not None:
226 cs.append(sa.func.length(self.cols.content).label(f'{self.name}_length'))
227 if with_content:
228 cs.append(self.cols.content)
230 if self.cols.path is not None:
231 cs.append(self.cols.path)
233 if self.cols.name is not None:
234 cs.append(self.cols.name)
236 return cs
238 def load_value(self, attributes: dict, mc) -> FileValue:
239 d = {}
241 if self.cols.content is not None:
242 d['size'] = attributes.get(f'{self.name}_length')
243 d['content'] = attributes.get(self.cols.content.name)
244 if self.cols.path is not None:
245 d['path'] = attributes.get(self.cols.path.name)
246 if self.cols.name is not None:
247 d['name'] = attributes.get(self.cols.name.name)
249 if d:
250 return FileValue(**d)