Coverage for gws-app/gws/plugin/qfield/action.py: 0%

119 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1from typing import Optional, cast 

2 

3import gws 

4import gws.config 

5import gws.base.action 

6import gws.lib.mime 

7import gws.lib.datetimex 

8import gws.lib.zipx 

9import gws.lib.osx 

10 

11from . import core 

12 

13gws.ext.new.action('qfield') 

14 

15 

16class Config(gws.ConfigWithAccess): 

17 """QField action. (added in 8.1)""" 

18 

19 packages: list[core.PackageConfig] 

20 withDbInDCIM: bool = False 

21 withSerialPrefix: bool = False 

22 

23 

24class Props(gws.base.action.Props): 

25 pass 

26 

27 

28class DownloadRequest(gws.Request): 

29 packageUid: Optional[str] 

30 omitStatic: bool = False 

31 omitData: bool = False 

32 

33 

34class PackageParams(gws.CliParams): 

35 projectUid: str 

36 """Project uid.""" 

37 packageUid: str 

38 """Package uid.""" 

39 out: str 

40 """Output filename.""" 

41 

42 

43class DownloadResponse(gws.Response): 

44 data: bytes 

45 

46 

47class UploadRequest(gws.Request): 

48 packageUid: Optional[str] 

49 data: Optional[bytes] 

50 

51 

52class UploadResponse(gws.Response): 

53 pass 

54 

55 

56_SERIAL_PREFIX_END_DATE = '2033-01-01' 

57 

58 

59class Object(gws.base.action.Object): 

60 packages: dict[str, core.Package] 

61 withDbInDCIM: bool 

62 withSerialPrefix: bool 

63 

64 def configure(self): 

65 self.packages = { 

66 p.uid: p 

67 for p in self.create_children(core.Package, self.cfg('packages')) 

68 } 

69 self.withDbInDCIM = self.cfg('withDbInDCIM', default=False) 

70 self.withSerialPrefix = self.cfg('withSerialPrefix', default=False) 

71 

72 @gws.ext.command.get('qfieldDownload') 

73 def http_download(self, req: gws.WebRequester, p: DownloadRequest) -> gws.ContentResponse: 

74 b = self._do_download(req, p) 

75 return gws.ContentResponse(content=b, mime=gws.lib.mime.ZIP) 

76 

77 @gws.ext.command.api('qfieldDownload') 

78 def api_download(self, req: gws.WebRequester, p: DownloadRequest) -> DownloadResponse: 

79 b = self._do_download(req, p) 

80 return DownloadResponse(data=b) 

81 

82 @gws.ext.command.cli('qfieldPackage') 

83 def cli_package(self, p: PackageParams): 

84 """Create a package for QField.""" 

85 

86 root = gws.config.load() 

87 user = root.app.authMgr.systemUser 

88 action = cast(Object, root.find_first(self.__class__)) 

89 

90 args = action.prepare_export(DownloadRequest( 

91 projectUid=p.projectUid, 

92 packageUid=p.packageUid, 

93 ), user) 

94 args.package.mapCacheLifeTime = 0 

95 

96 action.exec_export(args) 

97 b = action.end_export(args) 

98 gws.u.write_file_b(p.out, b) 

99 

100 @gws.ext.command.post('qfieldUpload') 

101 def http_upload(self, req: gws.WebRequester, p: UploadRequest) -> gws.ContentResponse: 

102 self._do_upload(req, p, req.data()) 

103 return gws.ContentResponse(content='ok\n') 

104 

105 @gws.ext.command.api('qfieldUpload') 

106 def api_upload(self, req: gws.WebRequester, p: UploadRequest) -> UploadResponse: 

107 self._do_upload(req, p, p.data) 

108 return UploadResponse() 

109 

110 ## 

111 

112 def _do_download(self, req: gws.WebRequester, p: DownloadRequest) -> bytes: 

113 args = self.prepare_export(p, req.user) 

114 self.exec_export(args) 

115 return self.end_export(args) 

116 

117 def prepare_export(self, p: DownloadRequest, user: gws.User) -> core.ExportArgs: 

118 project = user.require_project(p.projectUid) 

119 package = self._get_package(p.packageUid, user, gws.Access.read) 

120 base_dir = gws.u.ensure_dir(f'{gws.c.VAR_DIR}/qfield/{gws.u.random_string(32)}') 

121 

122 name_prefix = '' 

123 if self.withSerialPrefix: 

124 minutes = (gws.lib.datetimex.to_timestamp(gws.lib.datetimex.from_iso_string(_SERIAL_PREFIX_END_DATE)) - gws.u.stime()) // 60 

125 name_prefix = '{:06d}'.format(minutes) + '_' + gws.lib.datetimex.now().strftime('%d.%m.%y') + '_' 

126 

127 db_file_name = f'{name_prefix}{package.uid}.{core.GPKG_EXT}' 

128 db_path = db_file_name 

129 if self.withDbInDCIM: 

130 db_path = f'DCIM/{db_file_name}' 

131 gws.u.ensure_dir(f'{base_dir}/DCIM') 

132 

133 return core.ExportArgs( 

134 package=package, 

135 project=project, 

136 user=user, 

137 baseDir=base_dir, 

138 qgisFileName=f'{name_prefix}{package.uid}', 

139 dbFileName=db_file_name, 

140 dbPath=db_path, 

141 withBaseMap=not p.omitStatic, 

142 withData=not p.omitData, 

143 withMedia=not p.omitStatic, 

144 withQgis=not p.omitData, 

145 ) 

146 

147 def exec_export(self, args: core.ExportArgs): 

148 core.Exporter().run(args) 

149 

150 def end_export(self, args: core.ExportArgs) -> bytes: 

151 b = gws.lib.zipx.zip_to_bytes(args.baseDir) 

152 

153 if not self.root.app.developer_option('qfield.keep_temp_dirs'): 

154 gws.lib.osx.rmdir(args.baseDir) 

155 

156 return b 

157 

158 ## 

159 

160 def _do_upload(self, req: gws.WebRequester, p: UploadRequest, data: bytes): 

161 args = self.prepare_import(p, req.user, data) 

162 self.exec_import(args) 

163 return self.end_import(args) 

164 

165 def prepare_import(self, p: UploadRequest, user: gws.User, data: bytes): 

166 project = user.require_project(p.projectUid) 

167 package = self._get_package(p.packageUid, user, gws.Access.write) 

168 base_dir = gws.u.ensure_dir(f'{gws.c.VAR_DIR}/qfield/{gws.u.random_string(32)}') 

169 

170 if data.startswith(b'SQLite'): 

171 gws.u.write_file_b(f'{base_dir}/{package.uid}.{core.GPKG_EXT}', data) 

172 else: 

173 gws.lib.zipx.unzip_bytes(data, base_dir, flat=True) 

174 

175 db_file_name = f'{package.uid}.{core.GPKG_EXT}' 

176 

177 return core.ImportArgs( 

178 package=package, 

179 project=project, 

180 user=user, 

181 baseDir=base_dir, 

182 dbFileName=db_file_name, 

183 ) 

184 

185 def exec_import(self, args: core.ImportArgs): 

186 core.Importer().run(args) 

187 

188 def end_import(self, args: core.ImportArgs): 

189 if not self.root.app.developer_option('qfield.keep_temp_dirs'): 

190 gws.lib.osx.rmdir(args.baseDir) 

191 

192 ## 

193 

194 def _get_package(self, uid: str, user: gws.User, access: gws.Access) -> core.Package: 

195 pkg = self.packages.get(uid) if uid else gws.u.first(self.packages.values()) 

196 if not pkg: 

197 raise gws.NotFoundError(f'package {uid} not found') 

198 if not user.can(access, pkg): 

199 raise gws.ForbiddenError(f'package {uid} forbidden') 

200 return pkg