Coverage for gws-app/gws/core/tree_impl.py: 18%
247 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1from . import (
2 const as c,
3 util as u,
4 log,
5)
7Access = None
8Error = None
9ConfigurationError = None
10Data = None
11Props = None
12Object = None
15def object_repr(self):
16 r = getattr(self, 'extName', None) or class_name(self)
17 s = getattr(self, 'title', None)
18 if s:
19 r += f' title={s!r}'
20 s = getattr(self, 'uid', None)
21 if s:
22 r += f' uid={s}'
23 return '<' + r + ' ' + hex(id(self)) + '>'
26def node_initialize(self, config):
27 self.config = config
28 self.permissions = configure_permissions(self)
29 super_invoke(self, 'pre_configure')
30 super_invoke(self, 'configure')
33def node_create_child(self, classref, config, **kwargs):
34 return self.root.create(classref, parent=self, config=config, **kwargs)
37def node_create_child_if_configured(self, classref, config=None, **kwargs):
38 if not config:
39 return None
40 return self.root.create(classref, parent=self, config=config, **kwargs)
43def node_create_children(self, classref, configs, **kwargs):
44 if not configs:
45 return []
46 return u.compact(self.create_child(classref, cfg, **kwargs) for cfg in configs)
49def node_cfg(self, key: str, default=None):
50 val = u.get(self.config, key)
51 return val if val is not None else default
54def node_find_all(self, classref):
55 return find_all_in(self.root, self.children, classref)
58def node_find_first(self, classref):
59 return find_first_in(self.root, self.children, classref)
62def node_find_closest(self, classref):
63 node = self.parent
64 while True:
65 if not node or node is self.root:
66 return
67 if is_a(self.root, node, classref):
68 return node
69 node = node.parent
72def node_find_ancestors(self, classref):
73 ls = []
74 node = self.parent
76 while True:
77 if not node or node is self.root:
78 break
79 if not classref or is_a(self.root, node, classref):
80 ls.append(node)
81 node = node.parent
83 return ls
86def node_find_descendants(self, classref):
87 ls = []
89 def _walk(n):
90 for cn in n.children:
91 if not classref or is_a(self.root, cn, classref):
92 ls.append(cn)
93 _walk(cn)
95 _walk(self)
96 return ls
99def node_register_middleware(self, name: str, depends_on):
100 self.root.app.middlewareMgr.register(self, name, depends_on)
103##
106def root_init(self, specs):
107 self.specs = specs
108 self.app = None
109 self.permissions = {}
110 self.configErrors = []
111 self.configStack = []
112 self.nodes = []
113 self.uidMap = {}
114 self.uidCount = 1
117def root_initialize(self, node, config):
118 self.configStack.append(node)
120 try:
121 node.initialize(config)
122 ok = True
123 except Exception as exc:
124 log.exception()
125 register_config_error(self, exc)
126 ok = False
128 self.configStack.pop()
129 return ok
132def root_post_initialize(self):
133 for node in reversed(self.nodes):
134 self.configStack = []
135 p = node
136 while p:
137 self.configStack.insert(0, p)
138 p = getattr(p, 'parent', None)
139 try:
140 super_invoke(node, 'post_configure')
141 except Exception as exc:
142 log.exception()
143 register_config_error(self, exc)
146def root_activate(self):
147 for node in self.nodes:
148 # if type(node).activate != Node.activate:
149 # log.debug(f'activate: {node!r}')
150 node.activate()
153def root_find_all(self, classref):
154 return find_all_in(self, self.nodes, classref)
157def root_find_first(self, classref):
158 return find_first_in(self, self.nodes, classref)
161def root_get(self, uid, classref):
162 if not uid:
163 return
164 node = self.uidMap.get(uid)
165 if node and (not classref or is_a(self, node, classref)):
166 return node
169def root_object_count(self) -> int:
170 return len(self.nodes)
173def root_create(self, classref, parent, config, **kwargs):
174 config = to_config(config, kwargs)
175 return create_node(self, classref, parent, config)
178def root_create_shared(self, classref, config, **kwargs):
179 config = to_config(config, kwargs)
181 uid = config.uid
182 if not uid:
183 config.uid = '_s_' + u.sha256([repr(classref), config])
185 if config.uid in self.uidMap:
186 return self.uidMap[config.uid]
188 return create_node(self, classref, None, config)
191def root_create_temporary(self, classref, config, **kwargs):
192 config = to_config(config, kwargs)
193 return create_node(self, classref, None, config, temp=True)
196def root_create_application(self, config, **kwargs):
197 config = to_config(config, kwargs)
199 node = alloc_node(self, 'gws.base.application.core.Object')
200 node.uid = '1'
201 node.parent = self
202 node.children = []
204 self.nodes.append(node)
205 self.uidMap[node.uid] = node
206 self.app = node
208 self.initialize(node, config)
210 return node
213##
215def class_name(node):
216 return node.__class__.__module__ + '.' + node.__class__.__name__
219def alloc_node(self, classref, typ=None):
220 cls = self.specs.get_class(classref, typ)
221 if not cls:
222 raise Error(f'class {classref}:{typ} not found')
224 node = cls()
225 node.root = self
226 node.extName = getattr(cls, 'extName', '')
227 node.extType = getattr(cls, 'extType', '')
229 return node
232def configure_permissions(self):
233 perms = {}
235 p = self.cfg('access')
236 if p:
237 perms[Access.read] = u.parse_acl(p)
239 p = self.cfg('permissions')
240 if p:
241 for k, v in vars(p).items():
242 a = u.parse_acl(v)
243 if a:
244 if k == 'all':
245 perms[Access.read] = perms[Access.write] = perms[Access.create] = perms[Access.delete] = a
246 elif k == 'edit':
247 perms[Access.write] = perms[Access.create] = perms[Access.delete] = a
248 else:
249 perms[k] = a
251 return perms
254def create_node(self, classref, parent, config, temp=False):
255 node = alloc_node(self, classref, config.get('type'))
256 node.uid = get_or_generate_uid(self, config)
257 node.parent = parent
258 node.children = []
260 log.debug('configure: ' + ('.' * 4 * len(self.configStack)) + f'{node!r} IN {parent or self !r}')
261 ok = self.initialize(node, config)
262 if not ok:
263 log.debug(f'FAILED {node!r}')
264 return
266 if not temp:
267 self.nodes.append(node)
268 self.uidMap[node.uid] = node
270 if parent:
271 parent.children.append(node)
273 return node
276def find_all_in(root, where, classref):
277 cls, name, ext_name = root.specs.parse_classref(classref)
278 if cls:
279 return [node for node in where if isinstance(node, cls)]
280 if name:
281 return [node for node in where if class_name(node) == name]
282 if ext_name:
283 return [node for node in where if node.extName.startswith(ext_name)]
286def find_first_in(root, where, classref):
287 ls = find_all_in(root, where, classref)
288 return ls[0] if ls else None
291def get_or_generate_uid(self, config):
292 if config.get('uid'):
293 return config.get('uid')
294 self.uidCount += 1
295 return str(self.uidCount)
298def is_a(root, node, classref):
299 cls, name, ext_name = root.specs.parse_classref(classref)
300 if cls:
301 return isinstance(node, cls)
302 if name:
303 return class_name(node) == name
304 if ext_name:
305 return node.extName.startswith(ext_name)
306 return False
309def props_of(node, user, *context):
310 if not user.can_use(node, *context):
311 return None
312 p = make_props2(node, user)
313 if p is None or isinstance(p, Data):
314 return p
315 if isinstance(p, dict):
316 return Props(p)
317 raise Error('invalid props type')
320def make_props2(obj, user):
321 if u.is_atom(obj):
322 return obj
324 if isinstance(obj, Object):
325 if user.acl_bit(Access.read, obj) == c.DENY:
326 return None
327 obj = obj.props(user)
329 if isinstance(obj, Data):
330 obj = vars(obj)
332 if u.is_dict(obj):
333 return u.compact({k: make_props2(v, user) for k, v in obj.items()})
335 if u.is_list(obj):
336 return u.compact([make_props2(v, user) for v in obj])
338 return None
341def register_config_error(self, exc):
342 lines = ['in ' + repr(val) for val in reversed(self.configStack)]
343 err = ConfigurationError(repr(exc), *lines)
344 err.__cause__ = exc
345 self.configErrors.append(err)
348def super_invoke(node, method):
349 # since `super().configure` is mandatory in `configure` methods,
350 # let's automate this by collecting all super 'configure' methods
352 mro = []
354 for cls in type(node).mro():
355 try:
356 if method in vars(cls):
357 mro.append(cls)
358 except TypeError:
359 pass
361 for cls in reversed(mro):
362 getattr(cls, method)(node)
365def to_config(config, defaults):
366 return u.merge(Data(), defaults, config)