Coverage for gws-app/gws/core/util.py: 22%
554 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1"""Core utilities
3Most common function which are needed everywhere.
5This module is available as ``gws.u`` everywhere.
6"""
8import hashlib
9import json
10import os
11import pickle
12import random
13import re
14import sys
15import threading
16import time
17import urllib.parse
19from typing import cast, Union
20from . import const, log
23def is_data_object(x) -> bool:
24 pass
27def to_data_object(x) -> bool:
28 pass
31def exit(code: int = 255):
32 """Exit the application.
34 Args:
35 code: Exit code.
36 """
38 sys.exit(code)
41##
43# @TODO use ABC
45def is_list(x):
46 return isinstance(x, (list, tuple))
49def is_dict(x):
50 return isinstance(x, dict)
53def is_bytes(x):
54 return isinstance(x, (bytes, bytearray))
55 # @TODO how to handle bytes-alikes?
56 # return hasattr(x, 'decode')
59def is_atom(x):
60 return x is None or isinstance(x, (int, float, bool, str, bytes))
63def is_empty(x) -> bool:
64 """Check if the value is empty (None, empty list/dict/object)."""
66 if x is None:
67 return True
68 try:
69 return len(x) == 0
70 except TypeError:
71 pass
72 try:
73 return not vars(x)
74 except TypeError:
75 pass
76 return False
79##
81def get(x, key, default=None):
82 """Get a nested value/attribute from a structure.
84 Args:
85 x: A dict, list or Data.
86 key: A list or a dot separated string of nested keys.
87 default: The default value.
89 Returns:
90 The value if it exists and the default otherwise.
91 """
93 if not x:
94 return default
95 if isinstance(key, str):
96 key = key.split('.')
97 try:
98 return _get(x, key)
99 except (KeyError, IndexError, AttributeError, ValueError):
100 return default
103def has(x, key) -> bool:
104 """True if a nested value/attribute exists in a structure.
106 Args:
107 x: A dict, list or Data.
108 key: A list or a dot separated string of nested keys.
110 Returns:
111 True if a key exists
112 """
114 if not x:
115 return False
116 if isinstance(key, str):
117 key = key.split('.')
118 try:
119 _get(x, key)
120 return True
121 except (KeyError, IndexError, AttributeError, ValueError):
122 return False
125def _get(x, keys):
126 for k in keys:
127 if is_dict(x):
128 x = x[k]
129 elif is_list(x):
130 x = x[int(k)]
131 elif is_data_object(x):
132 # special case: raise a KeyError if the attribute is truly missing in a Data
133 # (and not just equals to None)
134 x = vars(x)[k]
135 else:
136 x = getattr(x, k)
137 return x
140def pop(x, key, default=None):
141 if is_dict(x):
142 return x.pop(key, default)
143 if is_data_object(x):
144 return vars(x).pop(key, default)
145 return default
148def pick(x, *keys):
149 def _pick(d):
150 r = {}
151 for k in keys:
152 if k in d:
153 r[k] = d[k]
154 return r
156 if is_dict(x):
157 return _pick(x)
158 if is_data_object(x):
159 return type(x)(_pick(vars(x)))
160 return {}
163def omit(x, *keys):
164 def _omit(d):
165 r = {}
166 for k, v in d.items():
167 if k not in keys:
168 r[k] = d[k]
169 return r
171 if is_dict(x):
172 return _omit(x)
173 if is_data_object(x):
174 return type(x)(_omit(vars(x)))
175 return {}
178def collect(pairs):
179 m = {}
181 for key, val in pairs:
182 if key is not None:
183 m.setdefault(key, []).append(val)
185 return m
188def first(it):
189 for x in it:
190 return x
193def first_not_none(*args):
194 for a in args:
195 if a is not None:
196 return a
199def merge(*args, **kwargs) -> Union[dict, 'Data']:
200 """Create a new dict/Data object by merging values from dicts/Datas or kwargs.
201 Latter vales overwrite former ones unless None.
203 Args:
204 *args: dicts or Datas.
205 **kwargs: Keyword args.
207 Returns:
208 A new object (dict or Data).
209 """
211 def _merge(arg):
212 for k, v in to_dict(arg).items():
213 if v is not None:
214 m[k] = v
216 m = {}
218 for a in args:
219 if a:
220 _merge(a)
221 if kwargs:
222 _merge(kwargs)
224 if not args or isinstance(args[0], dict) or args[0] is None:
225 return m
226 return type(args[0])(m)
229def deep_merge(x, y, concat_lists=True):
230 """Deeply merge dicts/Datas into a nested dict/Data.
231 Latter vales overwrite former ones unless None.
233 Args:
234 x: dict or Data.
235 y: dict or Data.
236 concat_lists: if true, list will be concatenated, otherwise merged
238 Returns:
239 A new object (dict or Data).
240 """
242 if (is_dict(x) or is_data_object(x)) and (is_dict(y) or is_data_object(y)):
243 xd = to_dict(x)
244 yd = to_dict(y)
245 d = {
246 k: deep_merge(xd.get(k), yd.get(k), concat_lists)
247 for k in xd.keys() | yd.keys()
248 }
249 return d if is_dict(x) else type(x)(d)
251 if is_list(x) and is_list(y):
252 xc = compact(x)
253 yc = compact(y)
254 if concat_lists:
255 return xc + yc
256 return [deep_merge(x1, y1, concat_lists) for x1, y1 in zip(xc, yc)]
258 return y if y is not None else x
261def compact(x):
262 """Remove all None values from a collection."""
264 if is_dict(x):
265 return {k: v for k, v in x.items() if v is not None}
266 if is_data_object(x):
267 d = {k: v for k, v in vars(x).items() if v is not None}
268 return type(x)(d)
269 return [v for v in x if v is not None]
272def strip(x):
273 """Strip all strings and remove empty values from a collection."""
275 def _strip(v):
276 if isinstance(v, (str, bytes, bytearray)):
277 return v.strip()
278 return v
280 def _dict(x1):
281 d = {}
282 for k, v in x1.items():
283 v = _strip(v)
284 if not is_empty(v):
285 d[k] = v
286 return d
288 if is_dict(x):
289 return _dict(x)
290 if is_data_object(x):
291 return type(x)(_dict(vars(x)))
293 r = [_strip(v) for v in x]
294 return [v for v in r if not is_empty(v)]
297def uniq(x):
298 """Remove duplicate elements from a collection."""
300 s = set()
301 r = []
303 for y in x:
304 try:
305 if y not in s:
306 s.add(y)
307 r.append(y)
308 except TypeError:
309 if y not in r:
310 r.append(y)
312 return r
315##
317def to_int(x) -> int:
318 """Convert a value to an int or 0 if this fails."""
320 try:
321 return int(x)
322 except:
323 return 0
326def to_rounded_int(x) -> int:
327 """Round and convert a value to an int or 0 if this fails."""
329 try:
330 if isinstance(x, float):
331 return int(round(x))
332 return int(x)
333 except:
334 return 0
337def to_float(x) -> float:
338 """Convert a value to a float or 0.0 if this fails."""
340 try:
341 return float(x)
342 except:
343 return 0.0
346def to_str(x, encodings: list[str] = None) -> str:
347 """Convert a value to a string.
349 Args:
350 x: Value.
351 encodings: A list of acceptable encodings. If the value is bytes, try each encoding,
352 and return the first one which passes without errors.
354 Returns:
355 A string.
356 """
358 if isinstance(x, str):
359 return x
360 if x is None:
361 return ''
362 if not is_bytes(x):
363 return str(x)
364 if encodings:
365 for enc in encodings:
366 try:
367 return x.decode(encoding=enc, errors='strict')
368 except UnicodeDecodeError:
369 pass
370 return x.decode(encoding='utf-8', errors='ignore')
373def to_bytes(x, encoding='utf8') -> bytes:
374 """Convert a value to bytes by converting it to string and encoding."""
376 if is_bytes(x):
377 return bytes(x)
378 if x is None:
379 return b''
380 if not isinstance(x, str):
381 x = str(x)
382 return x.encode(encoding or 'utf8')
385def to_list(x, delimiter: str = ',') -> list:
386 """Convert a value to a list.
388 Args:
389 x: A value. Is it's a string, split it by the delimiter
390 delimiter:
392 Returns:
393 A list.
394 """
396 if isinstance(x, list):
397 return x
398 if is_empty(x):
399 return []
400 if is_bytes(x):
401 x = to_str(x)
402 if isinstance(x, str):
403 if delimiter:
404 ls = [s.strip() for s in x.split(delimiter)]
405 return [s for s in ls if s]
406 return [x]
407 if isinstance(x, (int, float, bool)):
408 return [x]
409 try:
410 return [s for s in x]
411 except TypeError:
412 return []
415def to_dict(x) -> dict:
416 """Convert a value to a dict. If the argument is an object, return its `dict`."""
418 if is_dict(x):
419 return x
420 if x is None:
421 return {}
422 try:
423 f = getattr(x, '_asdict', None)
424 if f:
425 return f()
426 return vars(x)
427 except TypeError:
428 raise ValueError(f'cannot convert {x!r} to dict')
431def to_upper_dict(x) -> dict:
432 x = to_dict(x)
433 return {k.upper(): v for k, v in x.items()}
436def to_lower_dict(x) -> dict:
437 x = to_dict(x)
438 return {k.lower(): v for k, v in x.items()}
441##
443_UID_DE_TRANS = {
444 ord('ä'): 'ae',
445 ord('ö'): 'oe',
446 ord('ü'): 'ue',
447 ord('ß'): 'ss',
448}
451def to_uid(x) -> str:
452 """Convert a value to an uid (alphanumeric string)."""
454 if not x:
455 return ''
456 x = to_str(x).lower().strip().translate(_UID_DE_TRANS)
457 x = re.sub(r'[^a-z0-9]+', '_', x)
458 return x.strip('_')
461def to_lines(txt: str, comment: str = None) -> list[str]:
462 """Convert a multiline string into a list of strings.
464 Strip each line, skip empty lines, if `comment` is given, also remove lines starting with it.
465 """
467 ls = []
469 for s in txt.splitlines():
470 if comment and comment in s:
471 s = s.split(comment)[0]
472 s = s.strip()
473 if s:
474 ls.append(s)
476 return ls
479##
481def parse_acl(acl):
482 """Parse an ACL config into an ACL.
484 Args:
485 acl: an ACL config. Can be given as a string ``allow X, allow Y, deny Z``,
486 or as a list of dicts ``{ role X type allow }, { role Y type deny }``,
487 or it can already be an ACL ``[1 X], [0 Y]``,
488 or it can be None.
490 Returns:
491 Access list.
492 """
494 if not acl:
495 return []
497 a = 'allow'
498 d = 'deny'
499 bits = {const.ALLOW, const.DENY}
500 err = 'invalid ACL'
502 access = []
504 if isinstance(acl, str):
505 for p in acl.strip().split(','):
506 s = p.strip().split()
507 if len(s) != 2:
508 raise ValueError(err)
509 if s[0] == a:
510 access.append((const.ALLOW, s[1]))
511 elif s[0] == d:
512 access.append((const.DENY, s[1]))
513 else:
514 raise ValueError(err)
515 return access
517 if not isinstance(acl, list):
518 raise ValueError(err)
520 if isinstance(acl[0], (list, tuple)):
521 try:
522 if all(len(s) == 2 and s[0] in bits for s in acl):
523 return acl
524 except (TypeError, IndexError):
525 pass
526 raise ValueError(err)
528 if isinstance(acl[0], dict):
529 for s in acl:
530 tk = s.get('type', '')
531 rk = s.get('role', '')
532 if not isinstance(rk, str):
533 raise ValueError(err)
534 if tk == a:
535 access.append((const.ALLOW, rk))
536 elif tk == d:
537 access.append((const.DENY, rk))
538 else:
539 raise ValueError(err)
540 return access
542 raise ValueError(err)
545##
547UID_DELIMITER = '::'
550def join_uid(parent_uid, object_uid):
551 p = parent_uid.split(UID_DELIMITER)
552 u = object_uid.split(UID_DELIMITER)
553 return p[-1] + UID_DELIMITER + u[-1]
556def split_uid(joined_uid: str) -> tuple[str, str]:
557 p, _, u = joined_uid.partition(UID_DELIMITER)
558 return p, u
561##
563def is_file(path):
564 return os.path.isfile(path)
567def is_dir(path):
568 return os.path.isdir(path)
571def read_file(path: str) -> str:
572 try:
573 with open(path, 'rt', encoding='utf8') as fp:
574 return fp.read()
575 except Exception as exc:
576 log.debug(f'error reading {path=} {exc=}')
577 raise
580def read_file_b(path: str) -> bytes:
581 try:
582 with open(path, 'rb') as fp:
583 return fp.read()
584 except Exception as exc:
585 log.debug(f'error reading {path=} {exc=}')
586 raise
589def write_file(path: str, s: str, user: int = None, group: int = None):
590 try:
591 with open(path, 'wt', encoding='utf8') as fp:
592 fp.write(s)
593 _chown(path, user, group)
594 return path
595 except Exception as exc:
596 log.debug(f'error writing {path=} {exc=}')
597 raise
600def write_file_b(path: str, s: bytes, user: int = None, group: int = None):
601 try:
602 with open(path, 'wb') as fp:
603 fp.write(s)
604 _chown(path, user, group)
605 return path
606 except Exception as exc:
607 log.debug(f'error writing {path=} {exc=}')
608 raise
611def dirname(path):
612 return os.path.dirname(path)
615def ensure_dir(dir_path: str, base_dir: str = None, mode: int = 0o755, user: int = None, group: int = None) -> str:
616 """Check if a (possibly nested) directory exists and create if it does not.
618 Args:
619 dir_path: Path to a directory.
620 base_dir: Base directory.
621 mode: Directory creation mode.
622 user: Directory user (defaults to gws.c.UID)
623 group: Directory group (defaults to gws.c.GID)
625 Returns:
626 The absolute path to the directory.
627 """
629 if base_dir:
630 if os.path.isabs(dir_path):
631 raise ValueError(f'cannot use an absolute path {dir_path!r} with a base dir')
632 bpath = cast(bytes, os.path.join(base_dir.encode('utf8'), dir_path.encode('utf8')))
633 else:
634 if not os.path.isabs(dir_path):
635 raise ValueError(f'cannot use a relative path {dir_path!r} without a base dir')
636 bpath = dir_path.encode('utf8')
638 parts = []
640 for p in bpath.split(b'/'):
641 parts.append(p)
642 path = b'/'.join(parts)
643 if path and not os.path.isdir(path):
644 os.mkdir(path, mode)
646 _chown(bpath, user, group)
647 return bpath.decode('utf8')
650def ensure_system_dirs():
651 for d in const.ALL_DIRS:
652 ensure_dir(d)
655def _chown(path, user, group):
656 try:
657 os.chown(path, user or const.UID, group or const.GID)
658 except OSError:
659 pass
662def printtemp(name: str) -> str:
663 """Return a transient path name in the print directory."""
665 name = str(os.getpid()) + '_' + random_string(64) + '_' + name
666 return const.PRINT_DIR + '/' + name
669def random_string(size: int) -> str:
670 """Generate a random string of length `size`. """
672 a = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
673 r = random.SystemRandom()
674 return ''.join(r.choice(a) for _ in range(size))
677class _FormatMapDefault:
678 def __init__(self, d, default):
679 self.d = d
680 self.default = default
682 def __getitem__(self, item):
683 val = self.d.get(item)
684 return val if val is not None else self.default
687def format_map(fmt: str, x: Union[dict, 'Data'], default: str = '') -> str:
688 return fmt.format_map(_FormatMapDefault(x, default))
691def sha256(x):
692 def _bytes(x):
693 if is_bytes(x):
694 return bytes(x)
695 if isinstance(x, (int, float, bool)):
696 return str(x).encode('utf8')
697 if isinstance(x, str):
698 return x.encode('utf8')
700 def _default(x):
701 if is_data_object(x):
702 return vars(x)
703 return str(x)
705 c = _bytes(x)
706 if c is None:
707 j = json.dumps(x, default=_default, sort_keys=True, ensure_ascii=True)
708 c = j.encode('utf8')
710 return hashlib.sha256(c).hexdigest()
713class cached_property:
714 """Decorator for a cached property."""
716 def __init__(self, fn):
717 self._fn = fn
718 self.__doc__ = getattr(fn, '__doc__')
720 def __get__(self, obj, objtype=None):
721 value = self._fn(obj)
722 setattr(obj, self._fn.__name__, value)
723 return value
726# application lock/globals are global to one application
727# server locks lock the whole server
728# server globals are pickled in /tmp
731_app_lock = threading.RLock()
734def app_lock(name=''):
735 return _app_lock
738_app_globals: dict = {}
741def get_app_global(name, init_fn):
742 if name in _app_globals:
743 return _app_globals[name]
745 with app_lock(name):
746 if name not in _app_globals:
747 _app_globals[name] = init_fn()
749 return _app_globals[name]
752def set_app_global(name, value):
753 with app_lock(name):
754 _app_globals[name] = value
755 return _app_globals[name]
758def delete_app_global(name):
759 with app_lock(name):
760 _app_globals.pop(name, None)
763##
765def serialize_to_path(obj, path):
766 tmp = path + random_string(64)
767 with open(tmp, 'wb') as fp:
768 pickle.dump(obj, fp)
769 os.replace(tmp, path)
770 return path
773def unserialize_from_path(path):
774 with open(path, 'rb') as fp:
775 return pickle.load(fp)
778_server_globals = {}
781def get_cached_object(name: str, life_time: int, init_fn):
782 uid = to_uid(name)
783 path = const.OBJECT_CACHE_DIR + '/' + uid
785 def _get():
786 if not os.path.isfile(path):
787 return
788 try:
789 age = int(time.time() - os.stat(path).st_mtime)
790 except OSError:
791 return
792 if age < life_time:
793 try:
794 obj = unserialize_from_path(path)
795 log.debug(f'get_cached_object {uid!r} {life_time=} {age=} - loaded')
796 return obj
797 except:
798 log.exception(f'get_cached_object {uid!r} LOAD ERROR')
800 obj = _get()
801 if obj:
802 return obj
804 with server_lock(uid):
805 obj = _get()
806 if obj:
807 return obj
809 obj = init_fn()
810 try:
811 serialize_to_path(obj, path)
812 log.debug(f'get_cached_object {uid!r} - stored')
813 except:
814 log.exception(f'get_cached_object {uid!r} STORE ERROR')
816 return obj
819def get_server_global(name: str, init_fn):
820 uid = to_uid(name)
821 path = const.GLOBALS_DIR + '/' + uid
823 def _get():
824 if uid in _server_globals:
825 log.debug(f'get_server_global {uid!r} - found')
826 return True
828 if os.path.isfile(path):
829 try:
830 _server_globals[uid] = unserialize_from_path(path)
831 log.debug(f'get_server_global {uid!r} - loaded')
832 return True
833 except:
834 log.exception(f'get_server_global {uid!r} LOAD ERROR')
836 if _get():
837 return _server_globals[uid]
839 with server_lock(uid):
841 if _get():
842 return _server_globals[uid]
844 _server_globals[uid] = init_fn()
846 try:
847 serialize_to_path(_server_globals[uid], path)
848 log.debug(f'get_server_global {uid!r} - stored')
849 except:
850 log.exception(f'get_server_global {uid!r} STORE ERROR')
852 return _server_globals[uid]
855class _FileLock:
856 _PAUSE = 2
857 _TIMEOUT = 60
859 def __init__(self, uid):
860 self.uid = to_uid(uid)
861 self.path = const.LOCKS_DIR + '/' + self.uid
863 def __enter__(self):
864 self.acquire()
865 log.debug(f'server lock {self.uid!r} ACQUIRED')
867 def __exit__(self, exc_type, exc_val, exc_tb):
868 self.release()
870 def acquire(self):
871 ts = time.time()
873 while True:
874 try:
875 fp = os.open(self.path, os.O_CREAT | os.O_EXCL | os.O_WRONLY)
876 os.write(fp, bytes(os.getpid()))
877 os.close(fp)
878 return
879 except FileExistsError:
880 pass
882 t = time.time() - ts
884 if t > self._TIMEOUT:
885 raise ValueError('lock timeout', self.uid)
887 log.debug(f'server lock {self.uid!r} WAITING time={t:.3f}')
888 time.sleep(self._PAUSE)
890 def release(self):
891 try:
892 os.unlink(self.path)
893 log.debug(f'server lock {self.uid!r} RELEASED')
894 except:
895 log.exception(f'server lock {self.uid!r} RELEASE ERROR')
898def server_lock(uid):
899 return _FileLock(uid)
902##
904def action_url_path(name: str, **kwargs) -> str:
905 ls = []
907 for k, v in kwargs.items():
908 if not is_empty(v):
909 ls.append(urllib.parse.quote(k))
910 ls.append(urllib.parse.quote(to_str(v)))
912 path = const.SERVER_ENDPOINT + '/' + name
913 if ls:
914 path += '/' + '/'.join(ls)
915 return path
918##
920def utime() -> float:
921 """Unix time as a float number."""
922 return time.time()
925def stime() -> int:
926 """Unix time as an integer number of seconds."""
927 return int(time.time())
930def sleep(n: float):
931 """Sleep for n seconds."""
932 time.sleep(n)
935def mstime() -> int:
936 """Unix time as an integer number of milliseconds."""
937 return int(time.time() * 1000)