Coverage for gws-app/gws/test/util.py: 48%

235 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Test utilities.""" 

2 

3import contextlib 

4import os 

5import re 

6import typing 

7from typing import Optional 

8 

9import pytest 

10import requests 

11import werkzeug.test 

12 

13import gws 

14import gws.base.auth 

15import gws.base.feature 

16import gws.base.shape 

17import gws.base.web.wsgi_app 

18import gws.config 

19import gws.gis.crs 

20import gws.lib.cli as cli 

21import gws.lib.jsonx 

22import gws.lib.net 

23import gws.lib.sa as sa 

24import gws.lib.vendor.slon 

25import gws.spec.runtime 

26import gws.test.mock 

27 

28## 

29 

30mock = gws.test.mock 

31 

32fixture = pytest.fixture 

33raises = pytest.raises 

34 

35monkey_patch = pytest.MonkeyPatch.context 

36 

37cast = typing.cast 

38 

39exec = gws.lib.cli.exec 

40 

41## 

42 

43OPTIONS = {} 

44 

45 

46def option(name, default=None): 

47 return OPTIONS.get(name, default) 

48 

49 

50## 

51 

52 

53def _config_defaults(): 

54 return f''' 

55 database.providers+ {{  

56 uid "GWS_TEST_POSTGRES_PROVIDER"  

57 type "postgres"  

58 host {OPTIONS['service.postgres.host']!r} 

59 port {OPTIONS['service.postgres.port']!r} 

60 username {OPTIONS['service.postgres.user']!r} 

61 password {OPTIONS['service.postgres.password']!r} 

62 database {OPTIONS['service.postgres.database']!r} 

63 schemaCacheLifeTime 0  

64 }} 

65 ''' 

66 

67 

68def _to_data(x): 

69 if isinstance(x, gws.Data): 

70 for k, v in vars(x).items(): 

71 setattr(x, k, _to_data(v)) 

72 return x 

73 if isinstance(x, dict): 

74 d = gws.Data() 

75 for k, v in x.items(): 

76 setattr(d, k, _to_data(v)) 

77 return d 

78 if isinstance(x, list): 

79 return [_to_data(y) for y in x] 

80 if isinstance(x, tuple): 

81 return tuple(_to_data(y) for y in x) 

82 return x 

83 

84 

85_GWS_SPEC_DICT = None 

86 

87 

88def gws_specs() -> gws.SpecRuntime: 

89 global _GWS_SPEC_DICT 

90 

91 if _GWS_SPEC_DICT is None: 

92 base = option('BASE_DIR') 

93 _GWS_SPEC_DICT = gws.spec.runtime.get_spec( 

94 f'{base}/config/MANIFEST.json', 

95 read_cache=False, 

96 write_cache=False 

97 ) 

98 

99 return gws.spec.runtime.Object(_GWS_SPEC_DICT) 

100 

101 

102def gws_root(config: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True): 

103 config = config or '' 

104 if defaults: 

105 config = _config_defaults() + '\n' + config 

106 config = f'server.log.level {gws.log.get_level()}\n' + config 

107 parsed_config = _to_data(gws.lib.vendor.slon.parse(config, as_object=True)) 

108 specs = mock.register(specs or gws_specs()) 

109 root = gws.config.initialize(specs, parsed_config) 

110 if root.configErrors: 

111 for err in root.configErrors: 

112 gws.log.error(f'CONFIGURATION ERROR: {err}') 

113 raise gws.ConfigurationError('config failed') 

114 if not activate: 

115 return root 

116 root = gws.config.activate(root) 

117 return root 

118 

119 

120def gws_system_user(): 

121 return gws.base.auth.user.SystemUser(None, roles=[]) 

122 

123 

124def get_db(root): 

125 return root.get('GWS_TEST_POSTGRES_PROVIDER') 

126 

127 

128## 

129 

130# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/ 

131 

132class TestResponse(werkzeug.test.TestResponse): 

133 cookies: dict[str, werkzeug.test.Cookie] 

134 

135 

136def _wz_request(root, **kwargs): 

137 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root)) 

138 

139 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', [])) 

140 for c in cookies: 

141 client.set_cookie( 

142 key=c.key, 

143 value=c.value, 

144 max_age=c.max_age, 

145 expires=c.expires, 

146 path=c.path, 

147 domain=c.domain, 

148 secure=c.secure, 

149 httponly=c.http_only, 

150 ) 

151 

152 res = client.open(**kwargs) 

153 

154 # for some reason, responses do not include cookies, work around this 

155 res.cookies = {c.key: c for c in (client._cookies or {}).values()} 

156 return res 

157 

158 

159class http: 

160 @classmethod 

161 def get(cls, root, url, **kwargs) -> TestResponse: 

162 url = re.sub(r'\s+', '', url.strip()) 

163 url = '/' + url.strip('/') 

164 return _wz_request(root, method='GET', path=url, **kwargs) 

165 

166 @classmethod 

167 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse: 

168 path = gws.c.SERVER_ENDPOINT 

169 if cmd: 

170 path += '/' + cmd 

171 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs) 

172 

173 

174## 

175 

176class pg: 

177 saEngine: Optional[sa.Engine] = None 

178 saConn: Optional[sa.Connection] = None 

179 

180 @classmethod 

181 def connect(cls): 

182 # kwargs.setdefault('pool_pre_ping', True) 

183 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo')) 

184 

185 if not cls.saEngine: 

186 url = gws.lib.net.make_url( 

187 scheme='postgresql', 

188 username=OPTIONS['service.postgres.user'], 

189 password=OPTIONS['service.postgres.password'], 

190 hostname=OPTIONS['service.postgres.host'], 

191 port=OPTIONS['service.postgres.port'], 

192 path=OPTIONS['service.postgres.database'], 

193 ) 

194 cls.saEngine = sa.create_engine(url) 

195 

196 if not cls.saConn: 

197 cls.saConn = cls.saEngine.connect() 

198 

199 return cls.saConn 

200 

201 @classmethod 

202 def close(cls): 

203 if cls.saConn: 

204 cls.saConn.close() 

205 cls.saConn = None 

206 

207 @classmethod 

208 def create(cls, table_name: str, col_defs: dict): 

209 conn = cls.connect() 

210 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE')) 

211 ddl = _comma(f'{k} {v}' for k, v in col_defs.items()) 

212 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )')) 

213 conn.commit() 

214 

215 @classmethod 

216 def clear(cls, table_name: str): 

217 conn = cls.connect() 

218 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

219 conn.commit() 

220 

221 @classmethod 

222 def insert(cls, table_name: str, row_dicts: list[dict]): 

223 conn = cls.connect() 

224 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}')) 

225 if row_dicts: 

226 names = _comma(k for k in row_dicts[0]) 

227 values = _comma(':' + k for k in row_dicts[0]) 

228 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )') 

229 conn.execute(ins, row_dicts) 

230 conn.commit() 

231 

232 @classmethod 

233 def rows(cls, sql: str) -> list[tuple]: 

234 conn = cls.connect() 

235 return [tuple(r) for r in conn.execute(sa.text(sql))] 

236 

237 @classmethod 

238 def content(cls, sql_or_table_name: str) -> list[tuple]: 

239 if not sql_or_table_name.lower().startswith('select'): 

240 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}' 

241 return cls.rows(sql_or_table_name) 

242 

243 @classmethod 

244 def exec(cls, sql: str, **kwargs): 

245 conn = cls.connect() 

246 for s in sql.split(';'): 

247 if s.strip(): 

248 conn.execute(sa.text(s.strip()), kwargs) 

249 conn.commit() 

250 

251 

252## 

253 

254class log: 

255 _buf = [] 

256 

257 @classmethod 

258 def write(cls, s): 

259 cls._buf.append(s) 

260 

261 @classmethod 

262 def reset(cls): 

263 cls._buf = [] 

264 

265 @classmethod 

266 def get(cls): 

267 r = cls._buf 

268 cls._buf = [] 

269 return r 

270 

271 

272## 

273 

274 

275def feature_from_dict(model, atts) -> gws.Feature: 

276 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts)) 

277 f.attributes = atts 

278 return f 

279 

280 

281def feature(model, **atts) -> gws.Feature: 

282 return feature_from_dict(model, atts) 

283 

284 

285def ewkb(wkt: str, srid=3857): 

286 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.gis.crs.get(srid)) 

287 return shape.to_ewkb() 

288 

289 

290def model_context(**kwargs): 

291 kwargs.setdefault('op', gws.ModelOperation.read) 

292 kwargs.setdefault('user', gws_system_user()) 

293 return gws.ModelContext(**kwargs) 

294 

295 

296## 

297 

298 

299class mockserver: 

300 @classmethod 

301 def add(cls, text): 

302 cls.post('__add', data=text) 

303 

304 @classmethod 

305 def set(cls, text): 

306 cls.post('__set', data=text) 

307 

308 @classmethod 

309 def reset(cls): 

310 cls.post('__del') 

311 

312 @classmethod 

313 def post(cls, verb, data=''): 

314 requests.post(cls.url(verb), data=data) 

315 

316 @classmethod 

317 def url(cls, path=''): 

318 h = OPTIONS.get('service.mockserver.host') 

319 p = OPTIONS.get('service.mockserver.port') 

320 u = f'http://{h}:{p}' 

321 if path: 

322 u += '/' + path 

323 return u 

324 

325 

326## 

327 

328def fxml(s): 

329 s = re.sub(r'\s+', ' ', s.strip()) 

330 return ( 

331 s 

332 .replace(' <', '<') 

333 .replace('< ', '<') 

334 .replace(' >', '>') 

335 .replace('> ', '>') 

336 ) 

337 

338 

339## 

340 

341def unlink(path): 

342 if os.path.isfile(path): 

343 os.unlink(path) 

344 return 

345 if os.path.isdir(path): 

346 for de in os.scandir(path): 

347 unlink(de.path) 

348 os.rmdir(path) 

349 

350 

351def ensure_dir(path, clear=False): 

352 os.makedirs(path, exist_ok=True) 

353 if clear: 

354 for de in os.scandir(path): 

355 unlink(de.path) 

356 

357 

358def path_in_base_dir(path): 

359 base = option('BASE_DIR') 

360 return f'{base}/{path}' 

361 

362 

363@contextlib.contextmanager 

364def temp_file_in_base_dir(content='', keep=False): 

365 # for exec/chmod tests, which cannot use tmp_path 

366 

367 base = option('BASE_DIR') 

368 d = f'{base}/tmp' 

369 ensure_dir(d) 

370 

371 p = d + '/' + gws.u.random_string(16) 

372 gws.u.write_file(p, content) 

373 yield p 

374 

375 if not keep: 

376 unlink(p) 

377 

378 

379@contextlib.contextmanager 

380def temp_dir_in_base_dir(keep=False): 

381 base = option('BASE_DIR') 

382 

383 d = f'{base}/tmp/' + gws.u.random_string(16) 

384 ensure_dir(d, clear=True) 

385 yield d 

386 

387 if not keep: 

388 unlink(d) 

389 

390 

391_comma = ','.join