Coverage for gws-app/gws/lib/vendor/dog/util.py: 0%
148 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-17 01:37 +0200
1import os
2import random
3import re
4import subprocess
5import sys
6import time
9class Data:
10 def __init__(self, **kwargs):
11 vars(self).update(kwargs)
13 def get(self, k, default=None):
14 return vars(self).get(k, default)
16 def __getattr__(self, item):
17 return None
20def to_data(arg):
21 if isinstance(arg, Data):
22 return Data(**vars(arg))
23 return Data(**arg)
26def parse_args(argv):
27 args = {}
28 opt = None
29 n = 0
31 for a in argv:
32 if a.startswith('--'):
33 opt = a[2:]
34 args[opt] = True
35 elif a.startswith('-'):
36 opt = a[1:]
37 args[opt] = True
38 elif opt:
39 args[opt] = a
40 opt = None
41 else:
42 args[n] = a
43 n += 1
45 return args
48def read_file(path: str) -> str:
49 with open(path, 'rt', encoding='utf8') as fp:
50 return fp.read()
53def read_file_b(path: str) -> bytes:
54 with open(path, 'rb') as fp:
55 return fp.read()
58def write_file(path: str, s: str):
59 os.makedirs(os.path.dirname(path), exist_ok=True)
60 with open(path, 'wt', encoding='utf8') as fp:
61 fp.write(s)
64def write_file_b(path: str, s: bytes):
65 os.makedirs(os.path.dirname(path), exist_ok=True)
66 with open(path, 'wb') as fp:
67 fp.write(s)
70def abspath(path, base):
71 if os.path.isabs(path):
72 return path
73 path = os.path.join(os.path.dirname(base), path)
74 return os.path.abspath(path)
77def normpath(path):
78 res = []
79 if path.startswith('/'):
80 res.append('')
82 for p in path.split('/'):
83 p = p.strip()
84 if not p or p == '.':
85 continue
86 if p == '..':
87 if not res:
88 return ''
89 res.pop()
90 continue
91 if p.startswith('.'):
92 return ''
93 res.append(p)
95 return '/'.join(res)
98def random_string(length):
99 a = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
100 r = random.SystemRandom()
101 return ''.join(r.choice(a) for _ in range(length))
104_UID_DE_TRANS = {
105 ord('ä'): 'ae',
106 ord('ö'): 'oe',
107 ord('ü'): 'ue',
108 ord('ß'): 'ss',
109}
112def to_uid(x) -> str:
113 """Convert a value to an uid (alphanumeric string)."""
115 if not x:
116 return ''
117 x = str(x).lower().strip().translate(_UID_DE_TRANS)
118 x = re.sub(r'[^a-z0-9]+', '-', x)
119 return x.strip('-')
122def flatten(ls):
123 res = []
124 for x in ls:
125 if isinstance(x, list):
126 res.extend(flatten(x))
127 else:
128 res.append(x)
129 return res
132def run(cmd, pipe=False):
133 """Run a command, return a tuple (ok, output)"""
135 log.debug('run ' + repr(' '.join(cmd)))
137 args = {
138 'stdout': subprocess.PIPE if pipe else None,
139 'stderr': subprocess.STDOUT if pipe else None,
140 'stdin': None,
141 'shell': False,
142 }
144 try:
145 p = subprocess.Popen(cmd, **args)
146 out, _ = p.communicate(None)
147 rc = p.returncode
148 if isinstance(out, bytes):
149 out = out.decode('utf8')
150 except Exception as exc:
151 log.error(f'run failed: {cmd!r} {exc!r}')
152 return False, repr(exc)
154 if rc > 0:
155 log.error(f'run failed: {cmd!r} {out!r}')
156 return False, out
158 return True, out
161color = {
162 'none': '',
163 'black': '\u001b[30m',
164 'red': '\u001b[31m',
165 'green': '\u001b[32m',
166 'yellow': '\u001b[33m',
167 'blue': '\u001b[34m',
168 'magenta': '\u001b[35m',
169 'cyan': '\u001b[36m',
170 'white': '\u001b[37m',
171 'reset': '\u001b[0m',
172}
174log_colors = {
175 'ERROR': 'red',
176 'WARNING': 'yellow',
177 'INFO': 'green',
178 'DEBUG': 'cyan',
179}
182class _Logger:
183 level = 'INFO'
185 def set_level(self, level):
186 self.level = level
188 def log(self, level, *args):
189 levels = 'ERROR', 'WARNING', 'INFO', 'DEBUG'
190 if levels.index(level) <= levels.index(self.level):
191 cin = cout = ''
192 if sys.stdout.isatty():
193 cin = color[log_colors[level]]
194 cout = color['reset']
195 a = ' '.join(str(a) for a in args)
196 sys.stdout.write(f'{cin}[dog] {level}: {a}{cout}\n')
197 sys.stdout.flush()
199 def error(self, *args):
200 self.log('ERROR', *args)
202 def warning(self, *args):
203 self.log('WARNING', *args)
205 def info(self, *args):
206 self.log('INFO', *args)
208 def debug(self, *args):
209 self.log('DEBUG', *args)
212log = _Logger()
215def bold(c):
216 return c.replace('m', ';1m')
219def cprint(clr, msg):
220 print(color[clr] + msg + color['reset'])
223_TIME_STACK = []
226def time_start(label):
227 _TIME_STACK.append((time.time(), label))
230def time_end():
231 if _TIME_STACK:
232 t2 = time.time()
233 t1, label = _TIME_STACK.pop()
234 log.debug(f'{label} in {t2 - t1:.2f}s')
237def base36(n):
238 abc = '0123456789abcdefghijklmnopqrstuvwxyz'
239 enc = ''
240 while n:
241 n, i = divmod(n, 36)
242 enc = abc[i] + enc
243 return enc