Coverage for gws-app/gws/config/parser.py: 14%

183 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-17 01:37 +0200

1"""Parse and validate the main cfg and project configs""" 

2 

3from typing import Optional 

4 

5import os 

6import yaml 

7 

8import gws 

9import gws.lib.jsonx 

10import gws.lib.osx 

11import gws.lib.datetimex 

12import gws.lib.importer 

13import gws.lib.vendor.jump 

14import gws.lib.vendor.slon 

15import gws.spec.runtime 

16 

17CONFIG_PATH_PATTERN = r'\.(py|json|yaml|cx)$' 

18CONFIG_FUNCTION_NAME = 'config' 

19 

20 

21def parse(specs: gws.SpecRuntime, value, type_name: str, source_path='', read_options=None): 

22 """Parse a dictionary according to the klass spec and return a config (Data) object""" 

23 

24 try: 

25 read_options = read_options or set() 

26 read_options.add(gws.SpecReadOption.verboseErrors) 

27 return specs.read(value, type_name, path=source_path, options=read_options) 

28 except gws.spec.runtime.ReadError as exc: 

29 message, _, details = exc.args 

30 lines = [] 

31 p = details.get('path') 

32 if p: 

33 lines.append(f'PATH: {p!r}') 

34 p = details.get('formatted_value') 

35 if p: 

36 lines.append(f'VALUE: {p}') 

37 p = details.get('formatted_stack') 

38 if p: 

39 lines.extend(p) 

40 raise gws.ConfigurationError(f'parse error: {message}', lines) from exc 

41 

42 

43class ConfigParser: 

44 """Read and parse the main config file""" 

45 

46 def __init__(self, specs: gws.SpecRuntime): 

47 self.specs = specs 

48 self.errors = [] 

49 self.paths = set() 

50 

51 def parse_main(self, config_path=None) -> Optional[gws.Config]: 

52 payload = self.read(config_path) 

53 if not payload: 

54 return None 

55 return self.parse_main_from_dict(payload, config_path) 

56 

57 def parse_main_from_dict(self, dct, config_path) -> Optional[gws.Config]: 

58 prj_dicts = [] 

59 

60 # the timezone must be set before everything else 

61 tz = dct.get('server', {}).get('timeZone', '') 

62 if tz: 

63 gws.lib.datetimex.set_local_time_zone(tz) 

64 gws.log.info(f'local time zone is "{gws.lib.datetimex.time_zone()}"') 

65 

66 for prj_cfg in dct.pop('projects', []): 

67 for prj_dict in _as_flat_list(prj_cfg): 

68 prj_dicts.append([prj_dict, config_path]) 

69 

70 gws.log.info('parsing main configuration...') 

71 try: 

72 app_cfg = parse(self.specs, dct, 'gws.base.application.core.Config', config_path) 

73 except gws.ConfigurationError as exc: 

74 self.errors.append(exc) 

75 return None 

76 

77 app_cfg.projectPaths = app_cfg.projectPaths or [] 

78 app_cfg.projectDirs = app_cfg.projectDirs or [] 

79 

80 prj_paths = list(app_cfg.projectPaths) 

81 for dirname in app_cfg.projectDirs: 

82 prj_paths.extend(gws.lib.osx.find_files(dirname, CONFIG_PATH_PATTERN)) 

83 

84 for prj_path in sorted(set(prj_paths)): 

85 payload = self.read(prj_path) 

86 if not payload: 

87 continue 

88 for prj_dict in _as_flat_list(payload): 

89 prj_dicts.append([prj_dict, prj_path]) 

90 

91 app_cfg.projects = [] 

92 

93 for prj_dict, prj_path in prj_dicts: 

94 uid = prj_dict.get('uid') or prj_dict.get('title') or '?' 

95 gws.log.info(f'parsing project {uid!r}...') 

96 try: 

97 prj_cfg = parse(self.specs, prj_dict, 'gws.ext.config.project', prj_path) 

98 except gws.ConfigurationError as exc: 

99 self.errors.append(exc) 

100 continue 

101 app_cfg.projects.append(prj_cfg) 

102 

103 app_cfg.configPaths = sorted(self.paths) 

104 return app_cfg 

105 

106 def read(self, path): 

107 if not os.path.isfile(path): 

108 self.errors.append(_error(f'file not found: {path!r}')) 

109 return 

110 payload = self.read2(path) 

111 if payload: 

112 _save_intermediate(path, gws.lib.jsonx.to_pretty_string(payload), 'json') 

113 return payload 

114 

115 def read2(self, path): 

116 if path.endswith('.py'): 

117 try: 

118 mod = gws.lib.importer.import_from_path(path) 

119 fn = getattr(mod, CONFIG_FUNCTION_NAME) 

120 payload = fn() 

121 except Exception as exc: 

122 self.errors.append(_error('python error', cause=exc)) 

123 return 

124 if not isinstance(payload, dict): 

125 payload = _as_dict(payload) 

126 self.paths.add(path) 

127 return payload 

128 

129 if path.endswith('.json'): 

130 try: 

131 payload = gws.lib.jsonx.from_path(path) 

132 except Exception as exc: 

133 self.errors.append(_error('json error', cause=exc)) 

134 return 

135 self.paths.add(path) 

136 return payload 

137 

138 if path.endswith('.yaml'): 

139 try: 

140 with open(path, encoding='utf8') as fp: 

141 payload = yaml.safe_load(fp) 

142 except Exception as exc: 

143 self.errors.append(_error('yaml error', cause=exc)) 

144 return 

145 self.paths.add(path) 

146 return payload 

147 

148 if path.endswith('.cx'): 

149 return self.parse_cx_config(path) 

150 

151 self.errors.append(_error('unsupported config format', path)) 

152 

153 def parse_cx_config(self, path): 

154 paths = {path} 

155 runtime_errors = [] 

156 

157 def _error_handler(exc, path, line, env): 

158 runtime_errors.append(_syntax_error(path, gws.u.read_file(path), repr(exc), line)) 

159 return True 

160 

161 def _loader(cur_path, p): 

162 if not os.path.isabs(p): 

163 d = os.path.dirname(cur_path) 

164 p = os.path.abspath(os.path.join(d, p)) 

165 paths.add(p) 

166 return gws.u.read_file(p), p 

167 

168 try: 

169 tpl = gws.lib.vendor.jump.compile_path(path, loader=_loader) 

170 except gws.lib.vendor.jump.CompileError as exc: 

171 self.errors.append( 

172 _syntax_error(path, gws.u.read_file(exc.path), exc.message, exc.line, cause=exc)) 

173 return 

174 

175 src = gws.lib.vendor.jump.call(tpl, args={'true': True, 'false': False}, error=_error_handler) 

176 if runtime_errors: 

177 self.errors.extend(runtime_errors) 

178 return 

179 

180 _save_intermediate(path, src, 'slon') 

181 

182 try: 

183 payload = gws.lib.vendor.slon.loads(src, as_object=True) 

184 except gws.lib.vendor.slon.SlonError as exc: 

185 self.errors.append(_syntax_error(path, src, exc.args[0], exc.args[2], cause=exc)) 

186 return 

187 

188 self.paths.update(paths) 

189 return payload 

190 

191 

192def _syntax_error(path, src, message, line, context=10, cause=None): 

193 lines = [f'PATH: {path!r}'] 

194 

195 for n, ln in enumerate(src.splitlines(), 1): 

196 if n < line - context: 

197 continue 

198 if n > line + context: 

199 break 

200 ln = f'{n}: {ln}' 

201 if n == line: 

202 ln = f'>>> {ln}' 

203 lines.append(ln) 

204 

205 return _error(f'syntax error: {message}', *lines, cause=cause) 

206 

207 

208def _save_intermediate(path, txt, ext): 

209 gws.u.write_file(f"{gws.c.CONFIG_DIR}/{gws.u.to_uid(path)}.parsed.{ext}", txt) 

210 

211 

212def _as_flat_list(ls): 

213 if not isinstance(ls, (list, tuple)): 

214 yield ls 

215 else: 

216 for x in ls: 

217 yield from _as_flat_list(x) 

218 

219 

220def _as_dict(val): 

221 if isinstance(val, list): 

222 return [_as_dict(x) for x in val] 

223 if isinstance(val, tuple): 

224 return tuple(_as_dict(x) for x in val) 

225 if isinstance(val, gws.Data): 

226 val = vars(val) 

227 if isinstance(val, dict): 

228 return {k: _as_dict(v) for k, v in val.items()} 

229 return val 

230 

231 

232def _error(message, *args, cause=None): 

233 err = gws.ConfigurationError(message, *args) 

234 if cause: 

235 err.__cause__ = cause 

236 return err