Coverage for gws-app/gws/test/util.py: 48%
235 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Test utilities."""
3import contextlib
4import os
5import re
6import typing
7from typing import Optional
9import pytest
10import requests
11import werkzeug.test
13import gws
14import gws.base.auth
15import gws.base.feature
16import gws.base.shape
17import gws.base.web.wsgi_app
18import gws.config
19import gws.gis.crs
20import gws.lib.cli as cli
21import gws.lib.jsonx
22import gws.lib.net
23import gws.lib.sa as sa
24import gws.lib.vendor.slon
25import gws.spec.runtime
26import gws.test.mock
28##
30mock = gws.test.mock
32fixture = pytest.fixture
33raises = pytest.raises
35monkey_patch = pytest.MonkeyPatch.context
37cast = typing.cast
39exec = gws.lib.cli.exec
41##
43OPTIONS = {}
46def option(name, default=None):
47 return OPTIONS.get(name, default)
50##
53def _config_defaults():
54 return f'''
55 database.providers+ {{
56 uid "GWS_TEST_POSTGRES_PROVIDER"
57 type "postgres"
58 host {OPTIONS['service.postgres.host']!r}
59 port {OPTIONS['service.postgres.port']!r}
60 username {OPTIONS['service.postgres.user']!r}
61 password {OPTIONS['service.postgres.password']!r}
62 database {OPTIONS['service.postgres.database']!r}
63 schemaCacheLifeTime 0
64 }}
65 '''
68def _to_data(x):
69 if isinstance(x, gws.Data):
70 for k, v in vars(x).items():
71 setattr(x, k, _to_data(v))
72 return x
73 if isinstance(x, dict):
74 d = gws.Data()
75 for k, v in x.items():
76 setattr(d, k, _to_data(v))
77 return d
78 if isinstance(x, list):
79 return [_to_data(y) for y in x]
80 if isinstance(x, tuple):
81 return tuple(_to_data(y) for y in x)
82 return x
85_GWS_SPEC_DICT = None
88def gws_specs() -> gws.SpecRuntime:
89 global _GWS_SPEC_DICT
91 if _GWS_SPEC_DICT is None:
92 base = option('BASE_DIR')
93 _GWS_SPEC_DICT = gws.spec.runtime.get_spec(
94 f'{base}/config/MANIFEST.json',
95 read_cache=False,
96 write_cache=False
97 )
99 return gws.spec.runtime.Object(_GWS_SPEC_DICT)
102def gws_root(config: str = '', specs: gws.SpecRuntime = None, activate=True, defaults=True):
103 config = config or ''
104 if defaults:
105 config = _config_defaults() + '\n' + config
106 config = f'server.log.level {gws.log.get_level()}\n' + config
107 parsed_config = _to_data(gws.lib.vendor.slon.parse(config, as_object=True))
108 specs = mock.register(specs or gws_specs())
109 root = gws.config.initialize(specs, parsed_config)
110 if root.configErrors:
111 for err in root.configErrors:
112 gws.log.error(f'CONFIGURATION ERROR: {err}')
113 raise gws.ConfigurationError('config failed')
114 if not activate:
115 return root
116 root = gws.config.activate(root)
117 return root
120def gws_system_user():
121 return gws.base.auth.user.SystemUser(None, roles=[])
124def get_db(root):
125 return root.get('GWS_TEST_POSTGRES_PROVIDER')
128##
130# ref: https://werkzeug.palletsprojects.com/en/3.0.x/test/
132class TestResponse(werkzeug.test.TestResponse):
133 cookies: dict[str, werkzeug.test.Cookie]
136def _wz_request(root, **kwargs):
137 client = werkzeug.test.Client(gws.base.web.wsgi_app.make_application(root))
139 cookies = cast(list[werkzeug.test.Cookie], kwargs.pop('cookies', []))
140 for c in cookies:
141 client.set_cookie(
142 key=c.key,
143 value=c.value,
144 max_age=c.max_age,
145 expires=c.expires,
146 path=c.path,
147 domain=c.domain,
148 secure=c.secure,
149 httponly=c.http_only,
150 )
152 res = client.open(**kwargs)
154 # for some reason, responses do not include cookies, work around this
155 res.cookies = {c.key: c for c in (client._cookies or {}).values()}
156 return res
159class http:
160 @classmethod
161 def get(cls, root, url, **kwargs) -> TestResponse:
162 url = re.sub(r'\s+', '', url.strip())
163 url = '/' + url.strip('/')
164 return _wz_request(root, method='GET', path=url, **kwargs)
166 @classmethod
167 def api(cls, root, cmd, request=None, **kwargs) -> TestResponse:
168 path = gws.c.SERVER_ENDPOINT
169 if cmd:
170 path += '/' + cmd
171 return _wz_request(root, method='POST', path=path, json=request or {}, **kwargs)
174##
176class pg:
177 saEngine: Optional[sa.Engine] = None
178 saConn: Optional[sa.Connection] = None
180 @classmethod
181 def connect(cls):
182 # kwargs.setdefault('pool_pre_ping', True)
183 # kwargs.setdefault('echo', self.root.app.developer_option('db.engine_echo'))
185 if not cls.saEngine:
186 url = gws.lib.net.make_url(
187 scheme='postgresql',
188 username=OPTIONS['service.postgres.user'],
189 password=OPTIONS['service.postgres.password'],
190 hostname=OPTIONS['service.postgres.host'],
191 port=OPTIONS['service.postgres.port'],
192 path=OPTIONS['service.postgres.database'],
193 )
194 cls.saEngine = sa.create_engine(url)
196 if not cls.saConn:
197 cls.saConn = cls.saEngine.connect()
199 return cls.saConn
201 @classmethod
202 def close(cls):
203 if cls.saConn:
204 cls.saConn.close()
205 cls.saConn = None
207 @classmethod
208 def create(cls, table_name: str, col_defs: dict):
209 conn = cls.connect()
210 conn.execute(sa.text(f'DROP TABLE IF EXISTS {table_name} CASCADE'))
211 ddl = _comma(f'{k} {v}' for k, v in col_defs.items())
212 conn.execute(sa.text(f'CREATE TABLE {table_name} ( {ddl} )'))
213 conn.commit()
215 @classmethod
216 def clear(cls, table_name: str):
217 conn = cls.connect()
218 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
219 conn.commit()
221 @classmethod
222 def insert(cls, table_name: str, row_dicts: list[dict]):
223 conn = cls.connect()
224 conn.execute(sa.text(f'TRUNCATE TABLE {table_name}'))
225 if row_dicts:
226 names = _comma(k for k in row_dicts[0])
227 values = _comma(':' + k for k in row_dicts[0])
228 ins = sa.text(f'INSERT INTO {table_name} ( {names} ) VALUES( {values} )')
229 conn.execute(ins, row_dicts)
230 conn.commit()
232 @classmethod
233 def rows(cls, sql: str) -> list[tuple]:
234 conn = cls.connect()
235 return [tuple(r) for r in conn.execute(sa.text(sql))]
237 @classmethod
238 def content(cls, sql_or_table_name: str) -> list[tuple]:
239 if not sql_or_table_name.lower().startswith('select'):
240 sql_or_table_name = f'SELECT * FROM {sql_or_table_name}'
241 return cls.rows(sql_or_table_name)
243 @classmethod
244 def exec(cls, sql: str, **kwargs):
245 conn = cls.connect()
246 for s in sql.split(';'):
247 if s.strip():
248 conn.execute(sa.text(s.strip()), kwargs)
249 conn.commit()
252##
254class log:
255 _buf = []
257 @classmethod
258 def write(cls, s):
259 cls._buf.append(s)
261 @classmethod
262 def reset(cls):
263 cls._buf = []
265 @classmethod
266 def get(cls):
267 r = cls._buf
268 cls._buf = []
269 return r
272##
275def feature_from_dict(model, atts) -> gws.Feature:
276 f = gws.base.feature.new(model=model, record=gws.FeatureRecord(attributes=atts))
277 f.attributes = atts
278 return f
281def feature(model, **atts) -> gws.Feature:
282 return feature_from_dict(model, atts)
285def ewkb(wkt: str, srid=3857):
286 shape = gws.base.shape.from_wkt(wkt, default_crs=gws.gis.crs.get(srid))
287 return shape.to_ewkb()
290def model_context(**kwargs):
291 kwargs.setdefault('op', gws.ModelOperation.read)
292 kwargs.setdefault('user', gws_system_user())
293 return gws.ModelContext(**kwargs)
296##
299class mockserver:
300 @classmethod
301 def add(cls, text):
302 cls.post('__add', data=text)
304 @classmethod
305 def set(cls, text):
306 cls.post('__set', data=text)
308 @classmethod
309 def reset(cls):
310 cls.post('__del')
312 @classmethod
313 def post(cls, verb, data=''):
314 requests.post(cls.url(verb), data=data)
316 @classmethod
317 def url(cls, path=''):
318 h = OPTIONS.get('service.mockserver.host')
319 p = OPTIONS.get('service.mockserver.port')
320 u = f'http://{h}:{p}'
321 if path:
322 u += '/' + path
323 return u
326##
328def fxml(s):
329 s = re.sub(r'\s+', ' ', s.strip())
330 return (
331 s
332 .replace(' <', '<')
333 .replace('< ', '<')
334 .replace(' >', '>')
335 .replace('> ', '>')
336 )
339##
341def unlink(path):
342 if os.path.isfile(path):
343 os.unlink(path)
344 return
345 if os.path.isdir(path):
346 for de in os.scandir(path):
347 unlink(de.path)
348 os.rmdir(path)
351def ensure_dir(path, clear=False):
352 os.makedirs(path, exist_ok=True)
353 if clear:
354 for de in os.scandir(path):
355 unlink(de.path)
358def path_in_base_dir(path):
359 base = option('BASE_DIR')
360 return f'{base}/{path}'
363@contextlib.contextmanager
364def temp_file_in_base_dir(content='', keep=False):
365 # for exec/chmod tests, which cannot use tmp_path
367 base = option('BASE_DIR')
368 d = f'{base}/tmp'
369 ensure_dir(d)
371 p = d + '/' + gws.u.random_string(16)
372 gws.u.write_file(p, content)
373 yield p
375 if not keep:
376 unlink(p)
379@contextlib.contextmanager
380def temp_dir_in_base_dir(keep=False):
381 base = option('BASE_DIR')
383 d = f'{base}/tmp/' + gws.u.random_string(16)
384 ensure_dir(d, clear=True)
385 yield d
387 if not keep:
388 unlink(d)
391_comma = ','.join