Coverage for gws-app/gws/spec/runtime.py: 18%
136 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Validate values according to specs"""
3from typing import Optional
5import re
6import sys
8import gws
9import gws.lib.jsonx
10import gws.lib.importer
12from . import core, reader
13from .generator import generator
15Error = core.Error
16ReadError = core.ReadError
17GeneratorError = core.GeneratorError
18LoadError = core.LoadError
21def create(manifest_path: str = None, read_cache=False, write_cache=False) -> 'Object':
22 gs = get_spec(manifest_path, read_cache, write_cache)
23 return Object(gs)
26def get_spec(manifest_path: str = None, read_cache=False, write_cache=False) -> dict:
27 cache_path = gws.u.ensure_dir(gws.c.SPEC_DIR) + '/spec_' + gws.u.to_uid(manifest_path or '') + '.json'
29 if read_cache and gws.u.is_file(cache_path):
30 try:
31 gs = gws.lib.jsonx.from_path(cache_path)
32 gws.log.debug(f'spec.create: loaded from {cache_path!r}')
33 return gs
34 except gws.lib.jsonx.Error:
35 gws.log.exception(f'spec.create: load failed')
37 if manifest_path:
38 gws.log.debug(f'spec.create: using manifest {manifest_path!r}...')
40 gws.debug.time_start('SPEC GENERATOR')
41 gs = generator.generate_specs(manifest_path=manifest_path)
42 gws.debug.time_end()
44 if write_cache:
45 try:
46 gws.lib.jsonx.to_path(cache_path, gs, pretty=True)
47 gws.log.debug(f'spec.create: stored to {cache_path!r}')
48 except gws.lib.jsonx.Error:
49 gws.log.exception(f'spec.create: store failed')
51 return gs
54##
57class Object(gws.SpecRuntime):
58 def __init__(self, gs):
59 meta = gs['meta']
60 self.manifest = gws.ApplicationManifest(meta['manifest'])
61 self.manifestPath = meta['manifestPath']
62 self.version = meta['version']
64 self.specs = [core.Type(**s) for s in gs['specs']]
66 self.index = {}
67 for s in self.specs:
68 self.index[s.uid] = s
69 if s.extName:
70 self.index[s.extName] = s
72 self.strings = gs['strings']
73 self.chunks = gs['chunks']
75 self.appBundlePaths = []
76 for chunk in self.chunks:
77 path = chunk['bundleDir'] + '/' + gws.c.JS_BUNDLE
78 if path not in self.appBundlePaths:
79 self.appBundlePaths.append(path)
81 self._descCache = {}
83 def __getstate__(self):
84 self._descCache = {}
85 return vars(self)
87 def get_type(self, key):
88 return self.index.get(key)
90 def read(self, value, type_name, path='', options=None):
91 r = reader.Reader(self, path, options)
92 return r.read(value, type_name)
94 def object_descriptor(self, name):
95 if name in self._descCache:
96 return self._descCache[name]
98 typ = self.get_type(name)
99 if not typ:
100 return
102 self._descCache[name] = gws.ExtObjectDescriptor(
103 extName=typ.extName,
104 extType=typ.extName.split('.').pop(),
105 ident=typ.ident,
106 modName=typ.modName,
107 modPath=typ.modPath,
108 classPtr=None
109 )
111 return self._descCache[name]
113 def register_object(self, classref, ext_type, cls):
114 _, _, ext_name = self.parse_classref(classref)
115 if not ext_name:
116 raise Error(f'invalid class reference {classref!r}')
117 ext_name += '.' + ext_type
118 setattr(cls, 'extName', ext_name)
119 setattr(cls, 'extType', ext_type)
120 self._descCache[ext_name] = gws.ExtObjectDescriptor(
121 extName=ext_name,
122 extType=ext_type,
123 ident=cls.__name__,
124 modName='',
125 modPath='',
126 classPtr=cls
127 )
129 def get_class(self, classref, ext_type=None):
130 cls, real_name, ext_name = self.parse_classref(classref)
132 if cls:
133 return cls
135 if real_name:
136 desc = self.object_descriptor(real_name)
137 elif ext_name:
138 desc = self.object_descriptor(ext_name + '.' + (ext_type or core.DEFAULT_VARIANT_TAG))
139 else:
140 desc = None
142 if not desc:
143 return
145 if not desc.classPtr:
146 try:
147 mod = gws.lib.importer.import_from_path(desc.modPath, gws.c.APP_DIR)
148 except gws.lib.importer.Error as exc:
149 raise LoadError(f'cannot load class {classref!r} from {desc.modPath!r}') from exc
150 desc.classPtr = getattr(mod, desc.ident)
151 setattr(desc.classPtr, 'extName', desc.extName)
152 setattr(desc.classPtr, 'extType', desc.extType)
154 return desc.classPtr
156 def command_descriptor(self, command_category, command_name):
157 name = core.EXT_COMMAND_PREFIX + command_category + '.' + command_name
159 if name in self._descCache:
160 return self._descCache[name]
162 typ = self.get_type(name)
164 if not typ:
165 return
167 return gws.ExtCommandDescriptor(
168 extName=typ.extName,
169 extType=typ.extName.split('.').pop(),
170 tArg=typ.tArg,
171 tOwner=typ.tOwner,
172 owner=self.object_descriptor(typ.tOwner),
173 methodName=typ.ident,
174 )
176 def cli_commands(self, lang='en'):
177 strings = self.strings.get(lang) or self.strings['en']
178 cmds = []
180 for typ in self.specs:
181 if not typ.extName.startswith(core.EXT_COMMAND_CLI_PREFIX):
182 continue
184 # serverStart -> [server, start]
185 m = re.search(r'\.([a-z]+)(\w+)$', typ.extName)
186 cmd1 = m.group(1)
187 cmd2 = m.group(2).lower()
189 entry = gws.Data(
190 cmd1=cmd1,
191 cmd2=cmd2,
192 doc=strings.get(typ.uid) or self.strings['en'].get(typ.uid) or '',
193 )
194 cmds.append(entry)
196 args = []
197 arg_typ = self.get_type(typ.tArg)
198 if arg_typ:
199 for name, prop_type_uid in arg_typ.tProperties.items():
200 prop_typ = self.get_type(prop_type_uid)
201 args.append(gws.Data(
202 name=name,
203 type=prop_typ.tValue,
204 doc=strings.get(prop_type_uid) or self.strings['en'].get(prop_type_uid) or '',
205 defaultValue=prop_typ.defaultValue,
206 hasDefault=prop_typ.hasDefault,
207 ))
208 entry.args = sorted(args, key=lambda a: a.name)
210 return sorted(cmds, key=lambda c: (c.cmd1, c.cmd2))
212 def parse_classref(self, classref: gws.ClassRef) -> tuple[Optional[type], str, str]:
213 ext_name = gws.ext.name_for(classref)
214 if ext_name:
215 return None, '', ext_name
217 if isinstance(classref, str):
218 return None, classref, ''
220 if isinstance(classref, type):
221 return classref, '', ''
223 raise Error(f'invalid class reference {classref!r}')