Coverage for gws-app/gws/plugin/model_field/file/__init__.py: 0%

150 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""File field.""" 

2 

3from typing import Optional, cast 

4 

5import gws 

6import gws.base.database.model 

7import gws.base.model.field 

8import gws.lib.mime 

9import gws.lib.osx 

10import gws.lib.sa as sa 

11 

12gws.ext.new.modelField('file') 

13 

14 

15class Config(gws.base.model.field.Config): 

16 contentColumn: str = '' 

17 pathColumn: str = '' 

18 nameColumn: str = '' 

19 

20 

21class Props(gws.base.model.field.Props): 

22 pass 

23 

24 

25class Cols(gws.Data): 

26 content: Optional[sa.Column] 

27 path: Optional[sa.Column] 

28 name: Optional[sa.Column] 

29 

30 

31class FileInputProps(gws.Data): 

32 content: bytes 

33 name: str 

34 

35 

36class ServerFileProps(gws.Data): 

37 downloadUrl: str 

38 extension: str 

39 label: str 

40 previewUrl: str 

41 size: int 

42 

43 

44class ClientFileProps(gws.Data): 

45 name: str 

46 content: bytes 

47 

48 

49class FileValue(gws.Data): 

50 content: bytes 

51 name: str 

52 path: str 

53 size: int 

54 

55 

56class Object(gws.base.model.field.Object): 

57 model: gws.DatabaseModel 

58 

59 attributeType = gws.AttributeType.file 

60 cols: Cols 

61 

62 def __getstate__(self): 

63 return gws.u.omit(vars(self), 'cols') 

64 

65 def post_configure(self): 

66 self.configure_columns() 

67 

68 def activate(self): 

69 self.configure_columns() 

70 

71 def configure_columns(self): 

72 model = cast(gws.base.database.model.Object, self.model) 

73 

74 self.cols = Cols() 

75 

76 p = self.cfg('contentColumn') 

77 self.cols.content = model.column(p) if p else None 

78 

79 p = self.cfg('pathColumn') 

80 self.cols.path = model.column(p) if p else None 

81 

82 p = self.cfg('nameColumn') 

83 self.cols.name = model.column(p) if p else None 

84 

85 if self.cols.content is None and self.cols.path is None: 

86 raise gws.ConfigurationError('contentColumn or pathColumn must be set') 

87 

88 if not self.model.uidName: 

89 raise gws.ConfigurationError('file fields require a primary key') 

90 

91 def configure_widget(self): 

92 if not super().configure_widget(): 

93 self.widget = self.root.create_shared(gws.ext.object.modelWidget, type='file') 

94 return True 

95 

96 ## 

97 

98 def before_select(self, mc): 

99 mc.dbSelect.columns.extend(self.select_columns(False, mc)) 

100 

101 def after_select(self, features, mc): 

102 for feature in features: 

103 self.from_record(feature, mc) 

104 

105 def before_create(self, feature, mc): 

106 self.to_record(feature, mc) 

107 

108 def before_update(self, feature, mc): 

109 self.to_record(feature, mc) 

110 

111 def from_record(self, feature, mc): 

112 feature.set(self.name, self.load_value(feature.record.attributes, mc)) 

113 

114 def to_record(self, feature, mc): 

115 if not mc.user.can_write(self): 

116 return 

117 

118 # @TODO store in the filesystem 

119 

120 fv = cast(FileValue, feature.get(self.name)) 

121 if fv: 

122 if self.cols.content is not None: 

123 feature.record.attributes[self.cols.content.name] = fv.content 

124 if self.cols.name is not None: 

125 feature.record.attributes[self.cols.name.name] = fv.name 

126 

127 # @TODO merge with scalar_field? 

128 

129 def from_props(self, feature, mc): 

130 value = feature.props.attributes.get(self.name) 

131 if value is not None: 

132 value = self.prop_to_python(feature, value, mc) 

133 if value is not None: 

134 feature.set(self.name, value) 

135 

136 def to_props(self, feature, mc): 

137 if not mc.user.can_read(self): 

138 return 

139 value = feature.get(self.name) 

140 if value is not None: 

141 value = self.python_to_prop(feature, value, mc) 

142 if value is not None: 

143 feature.props.attributes[self.name] = value 

144 

145 ## 

146 

147 def prop_to_python(self, feature, value, mc) -> FileValue: 

148 try: 

149 return FileValue( 

150 content=gws.u.get(value, 'content'), 

151 name=gws.u.get(value, 'name'), 

152 ) 

153 except ValueError: 

154 return gws.ErrorValue 

155 

156 def python_to_prop(self, feature, value, mc) -> ServerFileProps: 

157 fv = cast(FileValue, value) 

158 

159 mime = self.get_mime_type(fv) 

160 ext = gws.lib.mime.extension_for(mime) 

161 

162 p = ServerFileProps( 

163 # @TODO use a template 

164 label=fv.name or '', 

165 extension=ext, 

166 size=fv.size or 0, 

167 previewUrl='', 

168 downloadUrl='', 

169 ) 

170 

171 name = fv.name or f'gws.{ext}' 

172 

173 url_args = dict( 

174 projectUid=mc.project.uid, 

175 modelUid=self.model.uid, 

176 fieldName=self.name, 

177 featureUid=feature.uid(), 

178 ) 

179 

180 if mime.startswith('image'): 

181 p.previewUrl = gws.u.action_url_path('webFile', preview=1, **url_args) + '/' + name 

182 

183 p.downloadUrl = gws.u.action_url_path('webFile', **url_args) + '/' + name 

184 

185 return p 

186 

187 ## 

188 

189 def get_mime_type(self, fv: FileValue) -> str: 

190 if fv.path: 

191 return gws.lib.mime.for_path(fv.path) 

192 if fv.name: 

193 return gws.lib.mime.for_path(fv.name) 

194 # @TODO guess mime from content? 

195 return gws.lib.mime.TXT 

196 

197 def handle_web_file_request(self, feature_uid: str, preview: bool, mc: gws.ModelContext) -> Optional[gws.ContentResponse]: 

198 model = cast(gws.DatabaseModel, self.model) 

199 

200 sql = sa.select( 

201 *self.select_columns(True, mc) 

202 ).where( 

203 model.uid_column().__eq__(feature_uid) 

204 ) 

205 

206 with self.model.db.connect() as conn: 

207 rs = list(conn.execute(sql)) 

208 if not rs: 

209 return 

210 

211 for row in rs: 

212 fv = self.load_value(gws.u.to_dict(row), mc) 

213 return gws.ContentResponse( 

214 asAttachment=not preview, 

215 attachmentName=fv.name, 

216 content=fv.content, 

217 mime=self.get_mime_type(fv), 

218 ) 

219 

220 ## 

221 

222 def select_columns(self, with_content, mc): 

223 cs = [] 

224 

225 if self.cols.content is not None: 

226 cs.append(sa.func.length(self.cols.content).label(f'{self.name}_length')) 

227 if with_content: 

228 cs.append(self.cols.content) 

229 

230 if self.cols.path is not None: 

231 cs.append(self.cols.path) 

232 

233 if self.cols.name is not None: 

234 cs.append(self.cols.name) 

235 

236 return cs 

237 

238 def load_value(self, attributes: dict, mc) -> FileValue: 

239 d = {} 

240 

241 if self.cols.content is not None: 

242 d['size'] = attributes.get(f'{self.name}_length') 

243 d['content'] = attributes.get(self.cols.content.name) 

244 if self.cols.path is not None: 

245 d['path'] = attributes.get(self.cols.path.name) 

246 if self.cols.name is not None: 

247 d['name'] = attributes.get(self.cols.name.name) 

248 

249 if d: 

250 return FileValue(**d)