Coverage for gws-app/gws/config/parser.py: 14%
183 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Parse and validate the main cfg and project configs"""
3from typing import Optional
5import os
6import yaml
8import gws
9import gws.lib.jsonx
10import gws.lib.osx
11import gws.lib.datetimex
12import gws.lib.importer
13import gws.lib.vendor.jump
14import gws.lib.vendor.slon
15import gws.spec.runtime
17CONFIG_PATH_PATTERN = r'\.(py|json|yaml|cx)$'
18CONFIG_FUNCTION_NAME = 'config'
21def parse(specs: gws.SpecRuntime, value, type_name: str, source_path='', read_options=None):
22 """Parse a dictionary according to the klass spec and return a config (Data) object"""
24 try:
25 read_options = read_options or set()
26 read_options.add(gws.SpecReadOption.verboseErrors)
27 return specs.read(value, type_name, path=source_path, options=read_options)
28 except gws.spec.runtime.ReadError as exc:
29 message, _, details = exc.args
30 lines = []
31 p = details.get('path')
32 if p:
33 lines.append(f'PATH: {p!r}')
34 p = details.get('formatted_value')
35 if p:
36 lines.append(f'VALUE: {p}')
37 p = details.get('formatted_stack')
38 if p:
39 lines.extend(p)
40 raise gws.ConfigurationError(f'parse error: {message}', lines) from exc
43class ConfigParser:
44 """Read and parse the main config file"""
46 def __init__(self, specs: gws.SpecRuntime):
47 self.specs = specs
48 self.errors = []
49 self.paths = set()
51 def parse_main(self, config_path=None) -> Optional[gws.Config]:
52 payload = self.read(config_path)
53 if not payload:
54 return None
55 return self.parse_main_from_dict(payload, config_path)
57 def parse_main_from_dict(self, dct, config_path) -> Optional[gws.Config]:
58 prj_dicts = []
60 # the timezone must be set before everything else
61 tz = dct.get('server', {}).get('timeZone', '')
62 if tz:
63 gws.lib.datetimex.set_local_time_zone(tz)
64 gws.log.info(f'local time zone is "{gws.lib.datetimex.time_zone()}"')
66 for prj_cfg in dct.pop('projects', []):
67 for prj_dict in _as_flat_list(prj_cfg):
68 prj_dicts.append([prj_dict, config_path])
70 gws.log.info('parsing main configuration...')
71 try:
72 app_cfg = parse(self.specs, dct, 'gws.base.application.core.Config', config_path)
73 except gws.ConfigurationError as exc:
74 self.errors.append(exc)
75 return None
77 app_cfg.projectPaths = app_cfg.projectPaths or []
78 app_cfg.projectDirs = app_cfg.projectDirs or []
80 prj_paths = list(app_cfg.projectPaths)
81 for dirname in app_cfg.projectDirs:
82 prj_paths.extend(gws.lib.osx.find_files(dirname, CONFIG_PATH_PATTERN))
84 for prj_path in sorted(set(prj_paths)):
85 payload = self.read(prj_path)
86 if not payload:
87 continue
88 for prj_dict in _as_flat_list(payload):
89 prj_dicts.append([prj_dict, prj_path])
91 app_cfg.projects = []
93 for prj_dict, prj_path in prj_dicts:
94 uid = prj_dict.get('uid') or prj_dict.get('title') or '?'
95 gws.log.info(f'parsing project {uid!r}...')
96 try:
97 prj_cfg = parse(self.specs, prj_dict, 'gws.ext.config.project', prj_path)
98 except gws.ConfigurationError as exc:
99 self.errors.append(exc)
100 continue
101 app_cfg.projects.append(prj_cfg)
103 app_cfg.configPaths = sorted(self.paths)
104 return app_cfg
106 def read(self, path):
107 if not os.path.isfile(path):
108 self.errors.append(_error(f'file not found: {path!r}'))
109 return
110 payload = self.read2(path)
111 if payload:
112 _save_intermediate(path, gws.lib.jsonx.to_pretty_string(payload), 'json')
113 return payload
115 def read2(self, path):
116 if path.endswith('.py'):
117 try:
118 mod = gws.lib.importer.import_from_path(path)
119 fn = getattr(mod, CONFIG_FUNCTION_NAME)
120 payload = fn()
121 except Exception as exc:
122 self.errors.append(_error('python error', cause=exc))
123 return
124 if not isinstance(payload, dict):
125 payload = _as_dict(payload)
126 self.paths.add(path)
127 return payload
129 if path.endswith('.json'):
130 try:
131 payload = gws.lib.jsonx.from_path(path)
132 except Exception as exc:
133 self.errors.append(_error('json error', cause=exc))
134 return
135 self.paths.add(path)
136 return payload
138 if path.endswith('.yaml'):
139 try:
140 with open(path, encoding='utf8') as fp:
141 payload = yaml.safe_load(fp)
142 except Exception as exc:
143 self.errors.append(_error('yaml error', cause=exc))
144 return
145 self.paths.add(path)
146 return payload
148 if path.endswith('.cx'):
149 return self.parse_cx_config(path)
151 self.errors.append(_error('unsupported config format', path))
153 def parse_cx_config(self, path):
154 paths = {path}
155 runtime_errors = []
157 def _error_handler(exc, path, line, env):
158 runtime_errors.append(_syntax_error(path, gws.u.read_file(path), repr(exc), line))
159 return True
161 def _loader(cur_path, p):
162 if not os.path.isabs(p):
163 d = os.path.dirname(cur_path)
164 p = os.path.abspath(os.path.join(d, p))
165 paths.add(p)
166 return gws.u.read_file(p), p
168 try:
169 tpl = gws.lib.vendor.jump.compile_path(path, loader=_loader)
170 except gws.lib.vendor.jump.CompileError as exc:
171 self.errors.append(
172 _syntax_error(path, gws.u.read_file(exc.path), exc.message, exc.line, cause=exc))
173 return
175 src = gws.lib.vendor.jump.call(tpl, args={'true': True, 'false': False}, error=_error_handler)
176 if runtime_errors:
177 self.errors.extend(runtime_errors)
178 return
180 _save_intermediate(path, src, 'slon')
182 try:
183 payload = gws.lib.vendor.slon.loads(src, as_object=True)
184 except gws.lib.vendor.slon.SlonError as exc:
185 self.errors.append(_syntax_error(path, src, exc.args[0], exc.args[2], cause=exc))
186 return
188 self.paths.update(paths)
189 return payload
192def _syntax_error(path, src, message, line, context=10, cause=None):
193 lines = [f'PATH: {path!r}']
195 for n, ln in enumerate(src.splitlines(), 1):
196 if n < line - context:
197 continue
198 if n > line + context:
199 break
200 ln = f'{n}: {ln}'
201 if n == line:
202 ln = f'>>> {ln}'
203 lines.append(ln)
205 return _error(f'syntax error: {message}', *lines, cause=cause)
208def _save_intermediate(path, txt, ext):
209 gws.u.write_file(f"{gws.c.CONFIG_DIR}/{gws.u.to_uid(path)}.parsed.{ext}", txt)
212def _as_flat_list(ls):
213 if not isinstance(ls, (list, tuple)):
214 yield ls
215 else:
216 for x in ls:
217 yield from _as_flat_list(x)
220def _as_dict(val):
221 if isinstance(val, list):
222 return [_as_dict(x) for x in val]
223 if isinstance(val, tuple):
224 return tuple(_as_dict(x) for x in val)
225 if isinstance(val, gws.Data):
226 val = vars(val)
227 if isinstance(val, dict):
228 return {k: _as_dict(v) for k, v in val.items()}
229 return val
232def _error(message, *args, cause=None):
233 err = gws.ConfigurationError(message, *args)
234 if cause:
235 err.__cause__ = cause
236 return err